You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/09 21:23:51 UTC
svn commit: r703233 - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physic...
Author: olga
Date: Thu Oct 9 12:23:51 2008
New Revision: 703233
URL: http://svn.apache.org/viewvc?rev=703233&view=rev
Log:
PIG-465: perf improvement - removing keys from the value
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Thu Oct 9 12:23:51 2008
@@ -275,3 +275,6 @@
PIG-457: report 100% on successful jobs only (shravanmn via olgan)
PIG-471: ignoring status errors from hadoop (pradeepk via olgan)
+
+ PIG-465: performance improvement - removing keys from the value (pradeepk
+ via olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Oct 9 12:23:51 2008
@@ -24,6 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.pig.data.DataType;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -393,7 +394,7 @@
func.setAlgebraicFunction(type);
}
- private void fixUpRearrange(POLocalRearrange rearrange) {
+ private void fixUpRearrange(POLocalRearrange rearrange) throws ExecException {
// Set the projection to be the key
PhysicalPlan newPlan = new PhysicalPlan();
String scope = rearrange.getOperatorKey().scope;
@@ -404,8 +405,7 @@
newPlan.add(proj);
List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(1);
plans.add(newPlan);
- rearrange.setPlans(plans);
- rearrange.setIndex(mKeyField);
+ rearrange.setPlansFromCombiner(plans);
}
private class AlgebraicPlanChecker extends PhyPlanVisitor {
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Oct 9 12:23:51 2008
@@ -381,7 +381,7 @@
Class comparator = PigContext.resolveClassName(compFuncSpec);
if(ComparisonFunc.class.isAssignableFrom(comparator)) {
jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
- pack.setKeyType(DataType.TUPLE);
+ jobConf.setReducerClass(PigMapReduce.ReduceWithComparator.class);
jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
jobConf.set("pig.usercomparator", "true");
jobConf.setOutputKeyClass(NullableTuple.class);
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Thu Oct 9 12:23:51 2008
@@ -21,6 +21,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
@@ -118,7 +119,11 @@
CombinerOptimizer co = new CombinerOptimizer(plan);
co.visit();
}
-
+
+ // optimize key - value handling in package
+ POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
+ pkgAnnotator.visit();
+
// check whether stream operator is present
MRStreamHandler checker = new MRStreamHandler(plan);
checker.visit();
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Oct 9 12:23:51 2008
@@ -36,6 +36,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -136,6 +137,10 @@
co.visit();
}
+ // optimize key - value handling in package
+ POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
+ pkgAnnotator.visit();
+
// check whether stream operator is present
MRStreamHandler checker = new MRStreamHandler(plan);
checker.visit();
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Oct 9 12:23:51 2008
@@ -48,6 +48,7 @@
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.WrappedIOException;
/**
* This class is the static Mapper & Reducer classes that
@@ -95,14 +96,26 @@
}
}
+ /**
+ * This "specialized" map class is ONLY to be used in pig queries with
+ * order by a udf. A UDF used for comparison in the order by expects
+ * to be handed tuples. Hence this map class ensures that the "key" used
+ * in the order by is wrapped into a tuple (if it isn't already a tuple)
+ */
public static class MapWithComparator extends PigMapBase implements
Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
@Override
public void collect(OutputCollector<PigNullableWritable, Writable> oc,
Tuple tuple) throws ExecException, IOException {
- Object k = tuple.get(1);
- Tuple keyTuple = tf.newTuple(k);
+ Object keyTuple = null;
+ if(keyType != DataType.TUPLE) {
+ Object k = tuple.get(1);
+ keyTuple = tf.newTuple(k);
+ } else {
+ keyTuple = tuple.get(1);
+ }
+
Byte index = (Byte)tuple.get(0);
PigNullableWritable key =
@@ -121,23 +134,23 @@
public static class Reduce extends MapReduceBase
implements
Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
- private final Log log = LogFactory.getLog(getClass());
+ protected final Log log = LogFactory.getLog(getClass());
//The reduce plan
- private PhysicalPlan rp;
+ protected PhysicalPlan rp;
//The POPackage operator which is the
//root of every Map Reduce plan is
//obtained through the job conf. The portion
//remaining after its removal is the reduce
//plan
- private POPackage pack;
+ protected POPackage pack;
ProgressableReporter pigReporter;
- private OutputCollector<PigNullableWritable, Writable> outputCollector;
+ protected OutputCollector<PigNullableWritable, Writable> outputCollector;
- private boolean errorInReduce = false;
+ protected boolean errorInReduce = false;
/**
* Configures the Reduce plan, the POPackage operator
@@ -231,7 +244,7 @@
* @throws ExecException
* @throws IOException
*/
- private void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
+ protected void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
while(true)
{
Tuple dummyTuple = null;
@@ -307,4 +320,98 @@
}
}
+ /**
+ * This "specialized" reduce class is ONLY to be used in pig queries with
+ * order by a udf. A UDF used for comparison in the order by expects
+ * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
+ * ensures that the "key" used in the order by is wrapped into a tuple (if it
+ * isn't already a tuple). This reduce class unwraps this tuple in the case where
+ * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
+ * for processing
+ */
+ public static class ReduceWithComparator extends PigMapReduce.Reduce {
+
+ private byte keyType;
+
+ /**
+ * Configures the Reduce plan, the POPackage operator
+ * and the reporter thread
+ */
+ @Override
+ public void configure(JobConf jConf) {
+ super.configure(jConf);
+ keyType = pack.getKeyType();
+ }
+
+ /**
+ * The reduce function which packages the key and List<Tuple>
+ * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
+ * The package result is either collected as is, if the reduce plan is
+ * empty or after passing through the reduce plan.
+ */
+ public void reduce(PigNullableWritable key,
+ Iterator<NullableTuple> tupIter,
+ OutputCollector<PigNullableWritable, Writable> oc,
+ Reporter reporter) throws IOException {
+
+ // cache the collector for use in runPipeline()
+ // which could additionally be called from close()
+ this.outputCollector = oc;
+ pigReporter.setRep(reporter);
+
+ // If the keyType is not a tuple, the MapWithComparator.collect()
+ // would have wrapped the key into a tuple so that the
+ // comparison UDF used in the order by can process it.
+ // We need to unwrap the key out of the tuple and hand it
+ // to the POPackage for processing
+ if(keyType != DataType.TUPLE) {
+ Tuple t = (Tuple)(key.getValueAsPigType());
+ try {
+ key = HDataType.getWritableComparableTypes(t.get(0), keyType);
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap(e);
+ }
+ }
+
+ pack.attachInput(key, tupIter);
+
+ try {
+ Tuple t=null;
+ Result res = pack.getNext(t);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ Tuple packRes = (Tuple)res.result;
+
+ if(rp.isEmpty()){
+ oc.collect(null, packRes);
+ return;
+ }
+
+ rp.attachInput(packRes);
+
+ List<PhysicalOperator> leaves = rp.getLeaves();
+
+ PhysicalOperator leaf = leaves.get(0);
+ runPipeline(leaf);
+
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ return;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ IOException ioe = new IOException("Packaging error while processing group");
+ throw ioe;
+ }
+
+
+ } catch (ExecException e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.initCause(e.getCause());
+ throw ioe;
+ }
+ }
+
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Oct 9 12:23:51 2008
@@ -76,6 +76,10 @@
//do nothing
}
+ public void visitCombinerPackage(POPostCombinerPackage pkg) throws VisitorException{
+ //do nothing
+ }
+
public void visitPOForEach(POForEach nfe) throws VisitorException {
List<PhysicalPlan> inpPlans = nfe.getInputPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -231,5 +235,19 @@
}
+ /**
+ * @param localRearrangeForIllustrate
+ * @throws VisitorException
+ */
+ public void visitLocalRearrangeForIllustrate(
+ POLocalRearrangeForIllustrate lrfi) throws VisitorException {
+ List<PhysicalPlan> inpPlans = lrfi.getPlans();
+ for (PhysicalPlan plan : inpPlans) {
+ pushWalker(mCurrentWalker.spawnChildWalker(plan));
+ visit();
+ }
+
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Oct 9 12:23:51 2008
@@ -18,7 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +32,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
@@ -47,31 +50,59 @@
/**
*
*/
- private static final long serialVersionUID = 1L;
+ protected static final long serialVersionUID = 1L;
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
private Log log = LogFactory.getLog(getClass());
- List<PhysicalPlan> plans;
+ protected List<PhysicalPlan> plans;
- List<ExpressionOperator> leafOps;
+ protected List<ExpressionOperator> leafOps;
// The position of this LR in the package operator
- byte index;
+ protected byte index;
- byte keyType;
+ protected byte keyType;
- private boolean mIsDistinct = false;
+ protected boolean mIsDistinct = false;
- private boolean isCross = false;
+ protected boolean isCross = false;
+
+ // map to store mapping of projected columns to
+ // the position in the "Key" where these will be projected to.
+ // We use this information to strip off these columns
+ // from the "Value" and in POPackage stitch the right "Value"
+ // tuple back by getting these columns from the "key". The goal
+ // is to reduce the amount of the data sent to Hadoop in the map.
+ // Example: a = load 'bla'; b = load 'bla'; c = cogroup a by ($2, $3), b by ($, $2)
+ // For the first input (a), the map would contain following key:value
+ // 2:0 (2 corresponds to $2 in cogroup a by ($2, $3) and 0 corresponds to 1st index in key)
+ // 3:1 (3 corresponds to $3 in cogroup a by ($2, $3) and 0 corresponds to 2nd index in key)
+ private Map<Integer, Integer> mProjectedColsMap;
// A place holder Tuple used in distinct case where we really don't
// have any value to pass through. But hadoop gets cranky if we pass a
// null, so we'll just create one instance of this empty tuple and
// pass it for every row. We only get around to actually creating it if
// mIsDistinct is set to true.
- private Tuple mFakeTuple = null;
+ protected Tuple mFakeTuple = null;
+
+ // indicator whether the project in the inner plans
+ // is a project(*) - we set this ONLY when the project(*)
+ // is the ONLY thing in the cogroup by ..
+ private boolean mProjectStar = false;
+
+ // marker to note that the "key" is a tuple
+ // this is required by POPackage to pick things
+ // off the "key" correctly to stitch together the
+ // "value"
+ private boolean isKeyTuple = false;
+
+ private int mProjectedColsMapSize = 0;
+
+ private ArrayList<Integer> minValuePositions;
+ private int minValuePositionsSize = 0;
public POLocalRearrange(OperatorKey k) {
this(k, -1, null);
@@ -89,6 +120,7 @@
super(k, rp, inp);
index = -1;
leafOps = new ArrayList<ExpressionOperator>();
+ mProjectedColsMap = new HashMap<Integer, Integer>();
}
@Override
@@ -206,12 +238,13 @@
resLst.add(res);
}
res.result = constructLROutput(resLst,(Tuple)inp.result);
+
return res;
}
return inp;
}
- private Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+ protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
//Construct key
Object key;
if(resLst.size()>1){
@@ -234,17 +267,70 @@
output.set(1, key);
output.set(2, mFakeTuple);
return output;
+ } else if(isCross){
+
+ for(int i=0;i<plans.size();i++)
+ value.getAll().remove(0);
+ //Put the index, key, and value
+ //in a tuple and return
+ output.set(0, new Byte(index));
+ output.set(1, key);
+ output.set(2, value);
+ return output;
} else {
- if(isCross){
- for(int i=0;i<plans.size();i++)
- value.getAll().remove(0);
- }
//Put the index, key, and value
//in a tuple and return
output.set(0, new Byte(index));
output.set(1, key);
- output.set(2, value);
+
+ // strip off the columns in the "value" which
+ // are present in the "key"
+ if(mProjectedColsMapSize != 0 || mProjectStar == true) {
+
+ Tuple minimalValue = null;
+ if(!mProjectStar) {
+ if(minValuePositions == null) {
+ // the very first time, we will have to build
+ // the "value" tuple piecemeal but we can
+ // do better next time round
+ minValuePositions = new ArrayList<Integer>();
+ minimalValue = mTupleFactory.newTuple();
+ // look for individual columns that we are
+ // projecting
+ for (int i = 0; i < value.size(); i++) {
+ if(mProjectedColsMap.get(i) == null) {
+ // this column was not found in the "key"
+ // so send it in the "value"
+ minimalValue.append(value.get(i));
+ minValuePositions.add(i);
+ }
+ }
+ minValuePositionsSize = minValuePositions.size();
+ } else {
+ minimalValue = mTupleFactory.newTuple(minValuePositionsSize);
+ for(int i = 0; i < minValuePositionsSize; i++) {
+ minimalValue.set(i, value.get(minValuePositions.get(i)));
+ }
+ }
+ } else {
+ // for the project star case
+ // we would send out an empty tuple as
+ // the "value" since all elements are in the
+ // "key"
+ minimalValue = mTupleFactory.newTuple();
+
+ }
+ output.set(2, minimalValue);
+
+ } else {
+
+ // there were no columns in the "key"
+ // which we can strip off from the "value"
+ // so just send the value we got
+ output.set(2, value);
+
+ }
return output;
}
}
@@ -264,9 +350,53 @@
public void setPlans(List<PhysicalPlan> plans) {
this.plans = plans;
leafOps.clear();
+ int keyIndex = 0; // zero based index for fields in the key
for (PhysicalPlan plan : plans) {
- leafOps.add((ExpressionOperator)plan.getLeaves().get(0));
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ leafOps.add(leaf);
+
+ // don't optimize CROSS
+ if(!isCross) {
+ // Look for the leaf Ops which are POProject operators - get the
+ // the columns that these POProject Operators are projecting.
+ // They MUST be projecting either a column or '*'.
+ // Keep track of the columns which are being projected and
+ // the position in the "Key" where these will be projected to.
+ // Then we can use this information to strip off these columns
+ // from the "Value" and in POPackage stitch the right "Value"
+ // tuple back by getting these columns from the "key". The goal
+ // is reduce the amount of the data sent to Hadoop in the map.
+ if(leaf instanceof POProject) {
+ POProject project = (POProject) leaf;
+ if(project.isStar()) {
+ if(plans.size() == 1) {
+ // note that we have a project *
+ mProjectStar = true;
+ // key will be a tuple in this case
+ isKeyTuple = true;
+ } else {
+ // TODO: currently "group by (*, somethingelse)" is NOT
+ // allowed. So we should never get here. But once it is
+ // allowed, we will need to handle it. For now just log
+ log.debug("Project * in group by not being optimized in key-value transfer");
+ }
+ } else {
+ mProjectedColsMap.put(project.getColumn(), keyIndex);
+ }
+ if(project.getResultType() == DataType.TUPLE)
+ isKeyTuple = true;
+ }
+ keyIndex++;
+ }
+ }
+ if(keyIndex > 1) {
+ // make a note that the "key" is a tuple
+ // this is required by POPackage to pick things
+ // off the "key" correctly to stitch together the
+ // "value"
+ isKeyTuple = true;
}
+ mProjectedColsMapSize = mProjectedColsMap.size();
}
/**
@@ -301,5 +431,74 @@
this.isCross = isCross;
}
+ /**
+ * @return the mProjectedColsMap
+ */
+ public Map<Integer, Integer> getProjectedColsMap() {
+ return mProjectedColsMap;
+ }
+
+ /**
+ * @return the mProjectStar
+ */
+ public boolean isProjectStar() {
+ return mProjectStar;
+ }
+
+ /**
+ * @return the keyTuple
+ */
+ public boolean isKeyTuple() {
+ return isKeyTuple;
+ }
+
+ /**
+ * @param plans2
+ * @throws ExecException
+ */
+ public void setPlansFromCombiner(List<PhysicalPlan> plans) throws ExecException {
+ this.plans = plans;
+ leafOps.clear();
+ mProjectedColsMap.clear();
+ int keyIndex = 0; // zero based index for fields in the key
+ for (PhysicalPlan plan : plans) {
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ leafOps.add(leaf);
+
+ // don't optimize CROSS
+ if(!isCross) {
+ // Look for the leaf Ops which are POProject operators - get the
+ // the columns that these POProject Operators are projecting.
+ // Keep track of the columns which are being projected and
+ // the position in the "Key" where these will be projected to.
+ // Then we can use this information to strip off these columns
+ // from the "Value" and in POPostCombinerPackage stitch the right "Value"
+ // tuple back by getting these columns from the "key". The goal
+ // is reduce the amount of the data sent to Hadoop in the map.
+ if(leaf instanceof POProject) {
+ POProject project = (POProject) leaf;
+ if(project.isStar()) {
+ log.error("Unexpected data during optimization");
+ throw new ExecException("Unexpected data during optimization (Local rearrange" +
+ " in combiner has a project *" );
+ } else {
+ mProjectedColsMap.put(project.getColumn(), keyIndex);
+ }
+ if(project.getResultType() == DataType.TUPLE)
+ isKeyTuple = true;
+ }
+ keyIndex++;
+ }
+ }
+ if(keyIndex > 1) {
+ // make a note that the "key" is a tuple
+ // this is required by POPackage to pick things
+ // off the "key" correctly to stitch together the
+ // "value"
+ isKeyTuple = true;
+ }
+ mProjectedColsMapSize = mProjectedColsMap.size();
+
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Oct 9 12:23:51 2008
@@ -17,8 +17,10 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
/**
* The package operator that packages
* the globally rearranged tuples into
@@ -64,6 +67,11 @@
//The key being worked on
Object key;
+ // marker to indicate if key is a tuple
+ protected boolean isKeyTuple = false;
+ // key as a Tuple object (if the key is a tuple)
+ protected Tuple keyAsTuple;
+
//key's type
byte keyType;
@@ -76,6 +84,14 @@
//on a particular input
boolean[] inner;
+ // A mapping of input index to key information got from LORearrange
+ // for that index. The Key information is a pair of boolean, Map.
+ // The boolean indicates whether there is a lone project(*) in the
+ // cogroup by. If not, the Map has a mapping of column numbers in the
+ // "value" to column numbers in the "key" which contain the fields in
+ // the "value"
+ protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
private final Log log = LogFactory.getLog(getClass());
protected static BagFactory mBagFactory = BagFactory.getInstance();
@@ -96,6 +112,7 @@
public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
numInputs = -1;
+ keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
}
@Override
@@ -127,6 +144,11 @@
public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
tupIter = inp;
key = k.getValueAsPigType();
+ if(isKeyTuple) {
+ // key is a tuple, cache the key as a
+ // tuple for use in the getNext()
+ keyAsTuple = (Tuple)key;
+ }
}
/**
@@ -183,8 +205,64 @@
copy.set(i, val.get(i));
}
*/
- Tuple copy = mTupleFactory.newTuple(val.getAll());
- if (numInputs > 0) dbs[ntup.getIndex()].add(copy);
+
+ Tuple copy = null;
+ // The "value (val)" that we just got may not
+ // be the complete "value". It may have some portions
+ // in the "key" (look in POLocalRearrange for more comments)
+ // If this is the case we need to stitch
+ // the "value" together.
+ int index = ntup.getIndex();
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+ keyInfo.get(index);
+ boolean isProjectStar = lrKeyInfo.first;
+ Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+ int keyLookupSize = keyLookup.size();
+
+ if( keyLookupSize > 0) {
+
+ // we have some fields of the "value" in the
+ // "key".
+ copy = mTupleFactory.newTuple();
+ int finalValueSize = keyLookupSize + val.size();
+ int valIndex = 0; // an index for accessing elements from
+ // the value (val) that we have currently
+ for(int i = 0; i < finalValueSize; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if(keyIndex == null) {
+ // the field for this index is not in the
+ // key - so just take it from the "value"
+ // we were handed
+ copy.append(val.get(valIndex));
+ valIndex++;
+ } else {
+ // the field for this index is in the key
+ if(isKeyTuple) {
+ // the key is a tuple, extract the
+ // field out of the tuple
+ copy.append(keyAsTuple.get(keyIndex));
+ } else {
+ copy.append(key);
+ }
+ }
+ }
+
+ } else if (isProjectStar) {
+
+ log.info("In project star, keyAsTuple:" + keyAsTuple);
+ // the whole "value" is present in the "key"
+ copy = mTupleFactory.newTuple(keyAsTuple.getAll());
+
+ } else {
+
+ // there is no field of the "value" in the
+ // "key" - so just make a copy of what we got
+ // as the "value"
+ copy = mTupleFactory.newTuple(val.getAll());
+
+ }
+
+ if (numInputs > 0) dbs[index].add(copy);
if(reporter!=null) reporter.progress();
}
@@ -241,5 +319,26 @@
return clone;
}
+ /**
+ * @param keyInfo the keyInfo to set
+ */
+ public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+ this.keyInfo = keyInfo;
+ }
+
+ /**
+ * @param keyTuple the keyTuple to set
+ */
+ public void setKeyTuple(boolean keyTuple) {
+ this.isKeyTuple = keyTuple;
+ }
+
+ /**
+ * @return the keyInfo
+ */
+ public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
+ return keyInfo;
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java Thu Oct 9 12:23:51 2008
@@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,7 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
/**
* The package operator that packages the globally rearranged tuples into
* output format after the combiner stage. It differs from POPackage in that
@@ -98,15 +100,39 @@
while (tupIter.hasNext()) {
NullableTuple ntup = tupIter.next();
Tuple tup = (Tuple)ntup.getValueAsPigType();
- for (int i = 0; i < tup.size(); i++) {
- if (mBags[i]) ((DataBag)fields[i]).add((Tuple)tup.get(i));
- else fields[i] = tup.get(i);
+ // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+ // group case and not in cogroups. So there should only
+ // be one LocalRearrange from which we get the keyInfo for
+ // which field in the value is in the key. This LocalRearrange
+ // has an index of -1. When we do support combiner in Cogroups
+ // THIS WILL NEED TO BE REVISITED.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+ keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+ Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+ int tupIndex = 0; // an index for accessing elements from
+ // the value (tup) that we have currently
+ for(int i = 0; i < mBags.length; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if(keyIndex == null) {
+ // the field for this index is not the
+ // key - so just take it from the "value"
+ // we were handed - Currently THIS HAS TO BE A BAG
+ // In future if this changes, THIS WILL NEED TO BE
+ // REVISITED.
+ ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
+ tupIndex++;
+ } else {
+ // the field for this index is in the key
+ fields[i] = key;
+ }
}
}
- //Construct the output tuple by appending
- //the key and all the above constructed bags
- //and return it.
+ // The successor of the POPostCombinerPackage as of
+ // now SHOULD be a POForeach which has been adjusted
+ // to look for the key in the right place - so we will
+ // NOT be adding the key in the result here but mere
+ // putting all bags into a result tuple and returning it.
Tuple res;
res = mTupleFactory.newTuple(mBags.length);
for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Thu Oct 9 12:23:51 2008
@@ -10,6 +10,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
@@ -52,7 +53,7 @@
for(LogicalOperator lo : inputs) {
List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(lo);
- POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+ POLocalRearrangeForIllustrate physOp = new POLocalRearrangeForIllustrate(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cg
.getRequestedParallelism());
List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java Thu Oct 9 12:23:51 2008
@@ -34,4 +34,12 @@
first = f;
second = s;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "[" + first.toString() +"," + second.toString() + "]";
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu Oct 9 12:23:51 2008
@@ -84,15 +84,29 @@
int size=0;
for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
Tuple t = (Tuple)res.result;
+ String key = (String)t.get(1);
Tuple val = (Tuple)t.get(2);
+ // The input data has 2 columns of which the first
+ // is the key
+ // With the optimized LocalRearrange, the part
+ // of the "value" present in the "key" is
+ // excluded from the "value". So to reconstruct
+ // the true "value", create a tuple with "key" in
+ // first position and the "value" (val) we currently
+ // have in the second position
+ assertEquals(1, val.size());
+
+ Tuple actualVal = new DefaultTuple();
+ actualVal.append(key);
+ actualVal.append(val.get(0));
//Check if the index is same as input index
assertEquals((byte)0, (byte)(Byte)t.get(0));
//Check if the input bag contains the value tuple
- assertTrue(TestHelper.bagContains(db, val));
+ assertTrue(TestHelper.bagContains(db, actualVal));
//Check if the input key and the output key are same
- String inpKey = (String)val.get(0);
+ String inpKey = (String)actualVal.get(0);
assertEquals(0, inpKey.compareTo((String)t.get(1)));
++size;
}
@@ -127,10 +141,23 @@
int size=0;
for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
Tuple t = (Tuple)res.result;
+ Tuple key = (Tuple)t.get(1);
Tuple val = (Tuple)t.get(2);
+
+ // The input data has 2 columns of which both
+ // are the key.
+ // With the optimized LocalRearrange, the part
+ // of the "value" present in the "key" is
+ // excluded from the "value". So in this case,
+ // the "value" coming out of the LocalRearrange
+ // would be an empty tuple
+ assertEquals(0, val.size());
+
//Check if the index is same as input index
assertEquals((byte)0, (byte)(Byte)t.get(0));
+ // reconstruct value from tuple
+ val = key;
//Check if the input baf contains the value tuple
assertTrue(TestHelper.bagContains(db, val));
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java Thu Oct 9 12:23:51 2008
@@ -30,7 +30,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
import org.apache.pig.data.BagFactory;
@@ -78,7 +78,7 @@
p1.add(prj1);
List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
in1.add(p1);
- POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+ POLocalRearrangeForIllustrate lr1 = new POLocalRearrangeForIllustrate(new OperatorKey("", r
.nextLong()), -1, inputs1);
lr1.setPlans(in1);
lr1.setIndex(0);
@@ -92,7 +92,7 @@
p2.add(prj2);
List<PhysicalPlan> in2 = new LinkedList<PhysicalPlan>();
in2.add(p2);
- POLocalRearrange lr2 = new POLocalRearrange(new OperatorKey("", r
+ POLocalRearrangeForIllustrate lr2 = new POLocalRearrangeForIllustrate(new OperatorKey("", r
.nextLong()), -1, inputs2);
lr2.setPlans(in2);
lr2.setIndex(1);
@@ -172,7 +172,7 @@
p1.add(prj1);
List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
in1.add(p1);
- POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+ POLocalRearrangeForIllustrate lr1 = new POLocalRearrangeForIllustrate(new OperatorKey("", r
.nextLong()), -1, inputs1);
lr1.setPlans(in1);
lr1.setIndex(0);
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Thu Oct 9 12:23:51 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,7 @@
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
@@ -79,6 +81,18 @@
pop.setInner(inner);
PigNullableWritable k = HDataType.getWritableComparableTypes(key, (byte)0);
pop.attachInput(k, db.iterator());
+
+ // we are not doing any optimization to remove
+ // parts of the "value" which are present in the "key" in this
+ // unit test - so set up the "keyInfo" accordingly in
+ // the POPackage
+ Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo =
+ new HashMap<Integer, Pair<Boolean, Map<Integer,Integer>>>();
+ Pair<Boolean, Map<Integer, Integer>> p =
+ new Pair<Boolean, Map<Integer, Integer>>(false, new HashMap<Integer, Integer>());
+ keyInfo.put(0, p);
+ keyInfo.put(1, p);
+ pop.setKeyInfo(keyInfo);
Tuple t = null;
Result res = null;
res = (Result) pop.getNext(t);