You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/05/30 01:51:53 UTC
svn commit: r780143 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/rel...
Author: olga
Date: Fri May 29 23:51:53 2009
New Revision: 780143
URL: http://svn.apache.org/viewvc?rev=780143&view=rev
Log:
PIG-802: PERFORMANCE: not creating bags for ORDER BY (serakesh via olgan)
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=780143&r1=780142&r2=780143&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri May 29 23:51:53 2009
@@ -48,6 +48,8 @@
BUG FIXES
+PIG-802: PERFORMANCE: not creating bags for ORDER BY (serakesh via olgan)
+
PIG-816: PigStorage() does not accept Unicode characters in its contructor (pradeepkth)
PIG-818: Explain doesn't handle PODemux properly (hagleitn via olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=780143&r1=780142&r2=780143&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri May 29 23:51:53 2009
@@ -62,6 +62,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -1124,12 +1125,10 @@
mro.setMapDone(true);
if (limit!=-1) {
- POPackage pkg_c = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+ POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
pkg_c.setNumInps(1);
//pkg.setResultType(DataType.TUPLE);
- boolean[] inner = {false};
- pkg_c.setInner(inner);
mro.combinePlan.add(pkg_c);
List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
@@ -1168,12 +1167,10 @@
mro.combinePlan.addAsLeaf(lr_c2);
}
- POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
keyType);
pkg.setNumInps(1);
- boolean[] inner = {false};
- pkg.setInner(inner);
mro.reducePlan.add(pkg);
PhysicalPlan ep = new PhysicalPlan();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=780143&r1=780142&r2=780143&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri May 29 23:51:53 2009
@@ -30,6 +30,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -197,6 +198,14 @@
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
loRearrangeFound++;
Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+ if (pkg instanceof POPackageLite) {
+ if(lrearrange.getIndex() != 0) {
+ // Throw some exception here
+ throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
+ }
+ }
+
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
keyInfo = pkg.getKeyInfo();
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=780143&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Fri May 29 23:51:53 2009
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.ReadOnceBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This package operator is a specialization
+ * of POPackage operator used for the specific
+ * case of the order by query. See JIRA 802
+ * for more details.
+ */
+public class POPackageLite extends POPackage {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public POPackageLite(OperatorKey k) {
+ super(k, -1, null);
+ }
+
+ public POPackageLite(OperatorKey k, int rp) {
+ super(k, rp, null);
+ }
+
+ public POPackageLite(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, -1, inp);
+ }
+
+ public POPackageLite(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage#setNumInps(int)
+ */
+ @Override
+ public void setNumInps(int numInps) {
+ if(numInps != 1)
+ {
+ throw new RuntimeException("POPackageLite can only take 1 input");
+ }
+ this.numInputs = numInps;
+ }
+
+ public boolean[] getInner() {
+ throw new RuntimeException("POPackageLite does not support getInner operation");
+ }
+
+ public void setInner(boolean[] inner) {
+ throw new RuntimeException("POPackageLite does not support setInner operation");
+ }
+
+ /**
+ * Make a deep copy of this operator.
+ * @throws CloneNotSupportedException
+ */
+ @Override
+ public POPackageLite clone() throws CloneNotSupportedException {
+ POPackageLite clone = (POPackageLite)super.clone();
+ clone.inner = null;
+ return clone;
+ }
+
+ /**
+ * @return the distinct
+ */
+ @Override
+ public boolean isDistinct() {
+ throw new RuntimeException("POPackageLite does not support isDistinct operation");
+ }
+
+ /**
+ * @param distinct the distinct to set
+ */
+ @Override
+ public void setDistinct(boolean distinct) {
+ throw new RuntimeException("POPackageLite does not support setDistinct operation");
+ }
+
+ /**
+ * @return the isKeyTuple
+ */
+ public boolean getKeyTuple() {
+ return isKeyTuple;
+ }
+
+ /**
+ * @return the keyAsTuple
+ */
+ public Tuple getKeyAsTuple() {
+ return keyAsTuple;
+ }
+
+ /**
+ * @return the tupIter
+ */
+ public Iterator<NullableTuple> getTupIter() {
+ return tupIter;
+ }
+
+ /**
+ * @return the key
+ */
+ public Object getKey() {
+ return key;
+ }
+
+ /**
+ * Similar to POPackage.getNext except that
+ * only one input is expected with index 0
+ * and ReadOnceBag is used instead of
+ * DefaultDataBag.
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Tuple res;
+ //Create numInputs bags
+ ReadOnceBag db = null;
+ db = new ReadOnceBag(this, tupIter, key);
+ if(reporter!=null) reporter.progress();
+
+ //Construct the output tuple by appending
+ //the key and all the above constructed bags
+ //and return it.
+ res = mTupleFactory.newTuple(numInputs+1);
+ res.set(0,key);
+ res.set(1,db);
+ detachInput();
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+
+ /**
+ * Makes use of the superclass method, but this requires
+ * an additional parameter key passed by ReadOnceBag.
+ * key of this instance will be set to null in detachInput
+ * call, but an instance of ReadOnceBag may have the original
+ * key that it uses. Therefore this extra argument is taken
+ * to temporarily set it before the call to the superclass method
+ * and then restore it.
+ */
+ public Tuple getValueTuple(NullableTuple ntup, int index, Object key) throws ExecException {
+ Object origKey = this.key;
+ this.key = key;
+ Tuple retTuple = super.getValueTuple(ntup, index);
+ this.key = origKey;
+ return retTuple;
+ }
+
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=780143&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Fri May 29 23:51:53 2009
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This bag is specifically created for use by POPackageLite. So it has three
+ * properties, the NullableTuple iterator, the key (Object) and the keyInfo
+ * (Map<Integer, Pair<Boolean, Map<Integer, Integer>>>) all three
+ * of which are required in the constructor call. This bag does not store
+ * the tuples in memory, but has access to an iterator typically provided by
+ * Hadoop. Use this when you already have an iterator over tuples and do not
+ * want to copy over again to a new bag.
+ */
+public class ReadOnceBag implements DataBag {
+
+ // The Package operator that created this
+ POPackageLite pkg;
+
+ //The iterator of Tuples. Marked transient because we will never serialize this.
+ transient Iterator<NullableTuple> tupIter;
+
+ // The key being worked on
+ Object key;
+
+ protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2L;
+
+ /**
+ * This constructor creates a bag out of an existing iterator
+ * of tuples by taking ownership of the iterator and NOT
+ * copying the elements of the iterator.
+ * @param pkg POPackageLite
+ * @param tupIter Iterator<NullableTuple>
+ * @param key Object
+ */
+ public ReadOnceBag(POPackageLite pkg, Iterator<NullableTuple> tupIter, Object key) {
+ this.pkg = pkg;
+ this.tupIter = tupIter;
+ this.key = key;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.util.Spillable#getMemorySize()
+ */
+ @Override
+ public long getMemorySize() {
+ throw new RuntimeException("ReadOnceBag does not support getMemorySize operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.util.Spillable#spill()
+
+ */
+ @Override
+ public long spill() {
+ throw new RuntimeException("ReadOnceBag does not support spill operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#add(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public void add(Tuple t) {
+ throw new RuntimeException("ReadOnceBag does not support add operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#addAll(org.apache.pig.data.DataBag)
+ */
+ @Override
+ public void addAll(DataBag b) {
+ throw new RuntimeException("ReadOnceBag does not support addAll operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#clear()
+ */
+ @Override
+ public void clear() {
+ throw new RuntimeException("ReadOnceBag does not support clear operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#isDistinct()
+ */
+ @Override
+ public boolean isDistinct() {
+ throw new RuntimeException("ReadOnceBag does not support isDistinct operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#isSorted()
+ */
+ @Override
+ public boolean isSorted() {
+ throw new RuntimeException("ReadOnceBag does not support isSorted operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#iterator()
+ */
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new ReadOnceBagIterator();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#markStale(boolean)
+ */
+ @Override
+ public void markStale(boolean stale) {
+ throw new RuntimeException("ReadOnceBag does not support markStale operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#size()
+ */
+ @Override
+ public long size() {
+ throw new RuntimeException("ReadOnceBag does not support size operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new RuntimeException("ReadOnceBag does not support readFields operation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ int errCode = 2142;
+ String msg = "ReadOnceBag should never be serialized.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ * This has to be defined since DataBag implements
+ * Comparable although, in this case we cannot really compare.
+ */
+ @Override
+ public int compareTo(Object o) {
+ throw new RuntimeException("ReadOnceBags cannot be compared");
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if(other instanceof ReadOnceBag)
+ {
+ if(pkg.getKeyTuple())
+ {
+ if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKeyTuple() == ((ReadOnceBag)other).pkg.getKeyTuple() && pkg.getKeyAsTuple().equals(((ReadOnceBag)other).pkg.getKeyAsTuple()))
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKey().equals(((ReadOnceBag)other).pkg.getKey()))
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ if(pkg.getKeyTuple())
+ {
+ hash = hash*31 + pkg.getKeyAsTuple().hashCode();
+ }
+ else
+ {
+ hash = hash*31 + pkg.getKey().hashCode();
+ }
+ return hash;
+ }
+
+ class ReadOnceBagIterator implements Iterator<Tuple>
+ {
+ /* (non-Javadoc)
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext() {
+ return tupIter.hasNext();
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ public Tuple next() {
+ NullableTuple ntup = tupIter.next();
+ int index = ntup.getIndex();
+ Tuple ret = null;
+ try {
+ ret = pkg.getValueTuple(ntup, index, key);
+ } catch (ExecException e)
+ {
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
+ }
+ return ret;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#remove()
+ */
+ @Override
+ public void remove() {
+ throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+ }
+ }
+}
+