You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/22 21:32:22 UTC
svn commit: r828825 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ sr...
Author: gates
Date: Thu Oct 22 19:32:22 2009
New Revision: 828825
URL: http://svn.apache.org/viewvc?rev=828825&view=rev
Log:
PIG-984: Add map side grouping for data that is already collected when it is read into the map.
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Oct 22 19:32:22 2009
@@ -26,6 +26,9 @@
IMPROVEMENTS
+PIG-984: Add map side grouping for data that is already collected when
+it is read into the map (rding via gates).
+
PIG-1025: Add ability to set job priority from Pig Latin script (kevinweil via
gates)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Oct 22 19:32:22 2009
@@ -79,6 +79,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -900,7 +901,22 @@
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
+ public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
+ try{
+ nonBlocking(op);
+ List<PhysicalPlan> plans = op.getPlans();
+ if(plans!=null)
+ for(PhysicalPlan ep : plans)
+ addUDFs(ep);
+ phyToMROpMap.put(op, curMROp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
try{
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Oct 22 19:32:22 2009
@@ -63,6 +63,12 @@
}
@Override
+ public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
+ super.visitCollectedGroup(mg);
+ mg.setParentPlan(parent);
+ }
+
+ @Override
public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
gr.setParentPlan(parent);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Oct 22 19:32:22 2009
@@ -23,6 +23,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -80,6 +81,13 @@
// merge join present
endOfAllInputFlag = true;
}
+
+ @Override
+ public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
+ // map side group present
+ endOfAllInputFlag = true;
+ }
+
/**
* @return if end of all input is present
*/
@@ -87,4 +95,5 @@
return endOfAllInputFlag;
}
}
-}
\ No newline at end of file
+}
+
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Oct 22 19:32:22 2009
@@ -645,13 +645,23 @@
@Override
public void visit(LOCogroup cg) throws VisitorException {
- boolean currentPhysicalPlan = false;
+
+ if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) {
+
+ translateCollectedCogroup(cg);
+
+ } else {
+
+ translateRegularCogroup(cg);
+ }
+ }
+
+ private void translateRegularCogroup(LOCogroup cg) throws VisitorException {
String scope = cg.getOperatorKey().scope;
List<LogicalOperator> inputs = cg.getInputs();
-
+
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
- scope, nodeGen.getNextNodeId(scope)), cg
- .getRequestedParallelism());
+ scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), cg.getRequestedParallelism());
@@ -669,8 +679,7 @@
int count = 0;
Byte type = null;
for (LogicalOperator op : inputs) {
- List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans()
- .get(op);
+ List<LogicalPlan> plans = (List<LogicalPlan>)cg.getGroupByPlans().get(op);
POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cg
.getRequestedParallelism());
@@ -682,9 +691,8 @@
.spawnChildWalker(lp);
pushWalker(childWalker);
mCurrentWalker.walk(this);
- exprPlans.add((PhysicalPlan) currentPlan);
+ exprPlans.add(currentPlan);
popWalker();
-
}
currentPlan = currentPlans.pop();
try {
@@ -697,8 +705,8 @@
try {
physOp.setIndex(count++);
} catch (ExecException e1) {
- int errCode = 2058;
- String msg = "Unable to set index on newly create POLocalRearrange.";
+ int errCode = 2058;
+ String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e1);
}
if (plans.size() > 1) {
@@ -720,8 +728,8 @@
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
-
}
+
poPackage.setKeyType(type);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
@@ -729,6 +737,59 @@
logToPhyMap.put(cg, poPackage);
}
+ private void translateCollectedCogroup(LOCogroup cg) throws VisitorException {
+ String scope = cg.getOperatorKey().scope;
+ List<LogicalOperator> inputs = cg.getInputs();
+
+ // can have only one input
+ LogicalOperator op = inputs.get(0);
+ List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(op);
+ POCollectedGroup physOp = new POCollectedGroup(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)));
+
+ List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+ currentPlans.push(currentPlan);
+ for (LogicalPlan lp : plans) {
+ currentPlan = new PhysicalPlan();
+ PlanWalker<LogicalOperator, LogicalPlan> childWalker =
+ mCurrentWalker.spawnChildWalker(lp);
+ pushWalker(childWalker);
+ mCurrentWalker.walk(this);
+ exprPlans.add(currentPlan);
+ popWalker();
+ }
+ currentPlan = currentPlans.pop();
+
+ try {
+ physOp.setPlans(exprPlans);
+ } catch (PlanException pe) {
+ int errCode = 2071;
+ String msg = "Problem with setting up map group's plans.";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+ }
+ Byte type = null;
+ if (plans.size() > 1) {
+ type = DataType.TUPLE;
+ physOp.setKeyType(type);
+ } else {
+ type = exprPlans.get(0).getLeaves().get(0).getResultType();
+ physOp.setKeyType(type);
+ }
+ physOp.setResultType(DataType.TUPLE);
+
+ currentPlan.add(physOp);
+
+ try {
+ currentPlan.connect(logToPhyMap.get(op), physOp);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ logToPhyMap.put(cg, physOp);
+ }
+
@Override
protected void visit(LOJoin loj) throws VisitorException {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Oct 22 19:32:22 2009
@@ -59,6 +59,15 @@
visit();
popWalker();
}
+
+ public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
+ List<PhysicalPlan> inpPlans = mg.getPlans();
+ for (PhysicalPlan plan : inpPlans) {
+ pushWalker(mCurrentWalker.spawnChildWalker(plan));
+ visit();
+ popWalker();
+ }
+ }
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
List<PhysicalPlan> inpPlans = lr.getPlans();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Oct 22 19:32:22 2009
@@ -156,6 +156,9 @@
else if(node instanceof POLocalRearrange){
sb.append(planString(((POLocalRearrange)node).getPlans()));
}
+ else if(node instanceof POCollectedGroup){
+ sb.append(planString(((POCollectedGroup)node).getPlans()));
+ }
else if(node instanceof POSort){
sb.append(planString(((POSort)node).getSortPlans()));
}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=828825&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Oct 22 19:32:22 2009
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The collected group operator is a special operator used when users give
+ * the hint 'using "collected"' in a group by clause. It implements a map-side
+ * group that collects all records for a given key into a buffer. When it sees
+ * a key change it will emit the key and bag for records it had buffered.
+ * It will assume that all keys for a given record are collected together
+ * and thus there is not need to buffer across keys.
+ *
+ */
+public class POCollectedGroup extends PhysicalOperator {
+
+ private static final List<PhysicalPlan> EMPTY_PLAN_LIST = new ArrayList<PhysicalPlan>();
+
+ protected static final long serialVersionUID = 1L;
+
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+// private Log log = LogFactory.getLog(getClass());
+
+ protected List<PhysicalPlan> plans;
+
+ protected List<ExpressionOperator> leafOps;
+
+ protected byte keyType;
+
+ private Tuple output;
+
+ private DataBag outputBag = null;
+
+ private Object prevKey = null;
+
+ public POCollectedGroup(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POCollectedGroup(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POCollectedGroup(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ public POCollectedGroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ leafOps = new ArrayList<ExpressionOperator>();
+ output = mTupleFactory.newTuple(2);
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitCollectedGroup(this);
+ }
+
+ @Override
+ public String name() {
+ return "Map side group " + "[" + DataType.findTypeName(resultType) +
+ "]" + "{" + DataType.findTypeName(keyType) + "}" + " - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ /**
+ * Overridden since the attachment of the new input should cause the old
+ * processing to end.
+ */
+ @Override
+ public void attachInput(Tuple t) {
+ super.attachInput(t);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
+ if (this.parentPlan.endOfAllInput) {
+ if (outputBag != null) {
+ Tuple tup = mTupleFactory.newTuple(2);
+ tup.set(0, prevKey);
+ tup.set(1, outputBag);
+ outputBag = null;
+ return new Result(POStatus.STATUS_OK, tup);
+ }
+
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ Result inp = null;
+ Result res = null;
+
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP ||
+ inp.returnStatus == POStatus.STATUS_ERR) {
+ break;
+ }
+
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ for (PhysicalPlan ep : plans) {
+ ep.attachInput((Tuple)inp.result);
+ }
+
+ List<Result> resLst = new ArrayList<Result>();
+ for (ExpressionOperator op : leafOps) {
+
+ switch (op.getResultType()){
+ case DataType.BAG:
+ res = op.getNext(dummyBag);
+ break;
+ case DataType.BOOLEAN:
+ res = op.getNext(dummyBool);
+ break;
+ case DataType.BYTEARRAY:
+ res = op.getNext(dummyDBA);
+ break;
+ case DataType.CHARARRAY:
+ res = op.getNext(dummyString);
+ break;
+ case DataType.DOUBLE:
+ res = op.getNext(dummyDouble);
+ break;
+ case DataType.FLOAT:
+ res = op.getNext(dummyFloat);
+ break;
+ case DataType.INTEGER:
+ res = op.getNext(dummyInt);
+ break;
+ case DataType.LONG:
+ res = op.getNext(dummyLong);
+ break;
+ case DataType.MAP:
+ res = op.getNext(dummyMap);
+ break;
+ case DataType.TUPLE:
+ res = op.getNext(dummyTuple);
+ break;
+ }
+ if (res.returnStatus != POStatus.STATUS_OK) {
+ return new Result();
+ }
+ resLst.add(res);
+ }
+
+ Tuple tup = constructOutput(resLst,(Tuple)inp.result);
+ Object curKey = tup.get(0);
+
+ // the first time, just create a new buffer and continue.
+ if (prevKey == null && outputBag == null) {
+ prevKey = curKey;
+ outputBag = BagFactory.getInstance().newDefaultBag();
+ outputBag.add((Tuple)tup.get(1));
+ continue;
+ }
+
+ // no key change
+ if (prevKey == null && curKey == null) {
+ outputBag.add((Tuple)tup.get(1));
+ continue;
+ }
+
+ // no key change
+ if (prevKey != null && curKey != null && ((Comparable)curKey).compareTo(prevKey) == 0) {
+ outputBag.add((Tuple)tup.get(1));
+ continue;
+ }
+
+ // key change
+ Tuple tup2 = mTupleFactory.newTuple(2);
+ tup2.set(0, prevKey);
+ tup2.set(1, outputBag);
+ res.result = tup2;
+
+ prevKey = curKey;
+ outputBag = BagFactory.getInstance().newDefaultBag();
+ outputBag.add((Tuple)tup.get(1));
+
+ return res;
+ }
+
+ return inp;
+ }
+
+ protected Tuple constructOutput(List<Result> resLst, Tuple value) throws ExecException{
+
+ // Construct key
+ Object key;
+
+ if (resLst.size() > 1) {
+ Tuple t = mTupleFactory.newTuple(resLst.size());
+ int i = -1;
+ for (Result res : resLst) {
+ t.set(++i, res.result);
+ }
+ key = t;
+ }
+ else {
+ key = resLst.get(0).result;
+ }
+
+ // Put key and value in a tuple and return
+ output.set(0, key);
+ output.set(1, value);
+
+ return output;
+ }
+
+ public byte getKeyType() {
+ return keyType;
+ }
+
+ public void setKeyType(byte keyType) {
+ this.keyType = keyType;
+ }
+
+ public List<PhysicalPlan> getPlans() {
+ return (plans == null) ? EMPTY_PLAN_LIST : plans;
+ }
+
+ public void setPlans(List<PhysicalPlan> plans) throws PlanException {
+ this.plans = plans;
+ leafOps.clear();
+ for (PhysicalPlan plan : plans) {
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ leafOps.add(leaf);
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu Oct 22 19:32:22 2009
@@ -18,6 +18,7 @@
package org.apache.pig.impl.logicalLayer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -43,6 +44,14 @@
public class LOCogroup extends RelationalOperator {
private static final long serialVersionUID = 2L;
+
+ /**
+ * Enum for the type of group
+ */
+ public static enum GROUPTYPE {
+ REGULAR, // Regular (co)group
+ COLLECTED // Collected group
+ };
/**
* Cogroup contains a list of logical operators corresponding to the
@@ -53,6 +62,7 @@
private boolean[] mIsInner;
private static Log log = LogFactory.getLog(LOCogroup.class);
private MultiMap<LogicalOperator, LogicalPlan> mGroupByPlans;
+ private GROUPTYPE mGroupType;
/**
*
@@ -70,9 +80,34 @@
OperatorKey k,
MultiMap<LogicalOperator, LogicalPlan> groupByPlans,
boolean[] isInner) {
+ this(plan, k, groupByPlans, GROUPTYPE.REGULAR, isInner);
+ }
+
+ /**
+ *
+ * @param plan
+ * LogicalPlan this operator is a part of.
+ * @param k
+ * OperatorKey for this operator
+ * @param groupByPlans
+ * the group by columns
+ * @param type
+ * the type of this group
+ * @param isInner
+ * indicates whether the cogroup is inner for each relation
+ */
+ public LOCogroup(
+ LogicalPlan plan,
+ OperatorKey k,
+ MultiMap<LogicalOperator, LogicalPlan> groupByPlans,
+ GROUPTYPE type,
+ boolean[] isInner) {
super(plan, k);
mGroupByPlans = groupByPlans;
- mIsInner = isInner;
+ if (isInner != null) {
+ mIsInner = Arrays.copyOf(isInner, isInner.length);
+ }
+ mGroupType = type;
}
public List<LogicalOperator> getInputs() {
@@ -95,6 +130,10 @@
mIsInner = inner;
}
+ public GROUPTYPE getGroupType() {
+ return mGroupType;
+ }
+
@Override
public String name() {
return "CoGroup " + mKey.scope + "-" + mKey.id;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Oct 22 19:32:22 2009
@@ -249,7 +249,7 @@
return fname;
}
- LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
+ LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
log.trace("Entering parseCogroup");
log.debug("LogicalPlan: " + lp);
@@ -286,7 +286,7 @@
isInner[i] = gi.isInner;
}
- LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, isInner);
+ LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, type, isInner);
lp.add(cogroup);
log.debug("Added operator " + cogroup.getClass().getName() + " object " + cogroup + " to the logical plan " + lp);
@@ -388,7 +388,7 @@
for (int i = 0; i < n; i++) {
(gis.get(i)).isInner = true;
}
- LogicalOperator cogroup = parseCogroup(gis, lp);
+ LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
lp.add(cogroup);
log.debug("Added operator " + cogroup.getClass().getName() + " to the logical plan");
@@ -676,7 +676,21 @@
}
log.trace("Exiting attachPlan");
}
-
+
+ boolean isColumnProjectionsOrStar(CogroupInput cgi) {
+ if (cgi == null || cgi.plans == null || cgi.plans.size() == 0) {
+ return false;
+ }
+ for (LogicalPlan keyPlan: cgi.plans) {
+ for (LogicalOperator op : keyPlan) {
+ if(!(op instanceof LOProject)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
}
@@ -1623,20 +1637,40 @@
LogicalOperator CogroupClause(LogicalPlan lp) :
{
- CogroupInput gi;
- ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
- LogicalOperator cogroup;
- log.trace("Entering CoGroupClause");
+ CogroupInput gi;
+ ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
+ LogicalOperator cogroup = null;
+ log.trace("Entering CoGroupClause");
}
{
- (gi = GroupItem(lp) { gis.add(gi); }
- ("," gi = GroupItem(lp) { gis.add(gi); })*)
- {
- cogroup = parseCogroup(gis, lp);
- log.trace("Exiting CoGroupClause");
- return cogroup;
- }
+ (gi = GroupItem(lp) { gis.add(gi); }
+ ("," gi = GroupItem(lp) { gis.add(gi); })*
+ (
+ [<USING> ("\"collected\"" {
+ if (gis.size() != 1) {
+ throw new ParseException("Collected group is only supported for single input");
+ }
+ if (!isColumnProjectionsOrStar(gis.get(0))) {
+ throw new ParseException("Collected group is only supported for columns or star projection");
+ }
+ cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
+ }
+ )
+ ]
+ )
+ )
+
+ {
+ if (cogroup != null) {
+ log.trace("Exiting CoGroupClause");
+ return cogroup;
+ }
+
+ cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
+ log.trace("Exiting CoGroupClause");
+ return cogroup;
+ }
}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=828825&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Thu Oct 22 19:32:22 2009
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestCollectedGroup extends TestCase {
+ private static final String INPUT_FILE = "MapSideGroupInput.txt";
+
+ private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+
+ public TestCollectedGroup() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ createFiles();
+ }
+
+ private void createFiles() throws IOException {
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("100\tapple1\t95");
+ w.println("100\tapple2\t83");
+ w.println("100\tapple2\t74");
+ w.println("200\torange1\t100");
+ w.println("200\torange2\t89");
+ w.println("300\tstrawberry\t64");
+ w.println("300\tstrawberry\t64");
+ w.println("300\tstrawberry\t76");
+ w.println("400\tpear\t78");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ new File(INPUT_FILE).delete();
+ Util.deleteFile(cluster, INPUT_FILE);
+ }
+
+ public void testPOMapsideGroupNoNullPlans() throws IOException {
+ POCollectedGroup pmg = new POCollectedGroup(new OperatorKey());
+ List<PhysicalPlan> plans = pmg.getPlans();
+
+ Assert.assertTrue(plans != null);
+ Assert.assertTrue(plans.size() == 0);
+ }
+
+ public void testMapsideGroupParserNoSupportForMultipleInputs() throws IOException {
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+
+ try {
+ pigServer.registerQuery("C = group A by id, B by id using \"collected\";");
+ fail("Pig doesn't support multi-input collected group.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(),
+ "Error during parsing. Collected group is only supported for single input");
+ }
+ }
+
+ public void testMapsideGroupParserNoSupportForGroupAll() throws IOException {
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+
+ try {
+ pigServer.registerQuery("B = group A all using \"collected\";");
+ fail("Pig doesn't support collected group all.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(),
+ "Error during parsing. Collected group is only supported for columns or star projection");
+ }
+ }
+
+ public void testMapsideGroupParserNoSupportForByExpression() throws IOException {
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+
+ try {
+ pigServer.registerQuery("B = group A by id*grade using \"collected\";");
+ fail("Pig doesn't support collected group by expression.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(),
+ "Error during parsing. Collected group is only supported for columns or star projection");
+ }
+ }
+
+ public void testMapsideGroupByOneColumn() throws IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("B = group A by id using \"collected\";");
+ pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while (iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = group A by id;");
+ pigServer.registerQuery("E = foreach D generate group, COUNT(A);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while (iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testMapsideGroupByMultipleColumns() throws IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("B = group A by (id, name) using \"collected\";");
+ pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while (iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = group A by (id, name);");
+ pigServer.registerQuery("E = foreach D generate group, COUNT(A);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while (iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testMapsideGroupByStar() throws IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("B = group A by * using \"collected\";");
+ pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while (iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = group A by *;");
+ pigServer.registerQuery("E = foreach D generate group, COUNT(A);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while (iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+}