You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/05/30 01:51:53 UTC

svn commit: r780143 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/rel...

Author: olga
Date: Fri May 29 23:51:53 2009
New Revision: 780143

URL: http://svn.apache.org/viewvc?rev=780143&view=rev
Log:
PIG-802: PERFORMANCE: not creating bags for ORDER BY (serakesh via olgan)

Added:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
    hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=780143&r1=780142&r2=780143&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri May 29 23:51:53 2009
@@ -48,6 +48,8 @@
 
 BUG FIXES
 
+PIG-802: PERFORMANCE: not creating bags for ORDER BY (serakesh via olgan)
+
 PIG-816: PigStorage() does not accept Unicode characters in its contructor (pradeepkth)
 
 PIG-818: Explain doesn't handle PODemux properly (hagleitn via olgan)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=780143&r1=780142&r2=780143&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri May 29 23:51:53 2009
@@ -62,6 +62,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -1124,12 +1125,10 @@
         mro.setMapDone(true);
         
         if (limit!=-1) {
-        	POPackage pkg_c = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        	pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+       	    POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
+       	    pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
             pkg_c.setNumInps(1);
             //pkg.setResultType(DataType.TUPLE);
-            boolean[] inner = {false};
-            pkg_c.setInner(inner);
             mro.combinePlan.add(pkg_c);
         	
             List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
@@ -1168,12 +1167,10 @@
 	        mro.combinePlan.addAsLeaf(lr_c2);
         }
         
-        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
         pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
             keyType);
         pkg.setNumInps(1);
-        boolean[] inner = {false}; 
-        pkg.setInner(inner);
         mro.reducePlan.add(pkg);
         
         PhysicalPlan ep = new PhysicalPlan();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=780143&r1=780142&r2=780143&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri May 29 23:51:53 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -197,6 +198,14 @@
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             loRearrangeFound++;
             Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+            if (pkg instanceof POPackageLite) {
+                if(lrearrange.getIndex() != 0) {
+                    // Throw some exception here
+                    throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
+                }
+            }
+
             // annotate the package with information from the LORearrange
             // update the keyInfo information if already present in the POPackage
             keyInfo = pkg.getKeyInfo();

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=780143&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Fri May 29 23:51:53 2009
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.ReadOnceBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This package operator is a specialization
+ * of POPackage operator used for the specific
+ * case of the order by query. See JIRA 802 
+ * for more details. 
+ */
+public class POPackageLite extends POPackage {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public POPackageLite(OperatorKey k) {
+        super(k, -1, null);
+    }
+
+    public POPackageLite(OperatorKey k, int rp) {
+        super(k, rp, null);
+    }
+
+    public POPackageLite(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, -1, inp);
+    }
+
+    public POPackageLite(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage#setNumInps(int)
+     */
+	@Override
+    public void setNumInps(int numInps) {
+		if(numInps != 1)
+		{
+			throw new RuntimeException("POPackageLite can only take 1 input");
+		}
+        this.numInputs = numInps;
+    }
+	
+    public boolean[] getInner() {
+        throw new RuntimeException("POPackageLite does not support getInner operation");
+    }
+
+    public void setInner(boolean[] inner) {
+        throw new RuntimeException("POPackageLite does not support setInner operation");
+    }
+    
+    /**
+     * Make a deep copy of this operator.  
+     * @throws CloneNotSupportedException
+     */
+    @Override
+    public POPackageLite clone() throws CloneNotSupportedException {
+        POPackageLite clone = (POPackageLite)super.clone();
+        clone.inner = null;
+        return clone;
+    }
+    
+    /**
+     * @return the distinct
+     */
+    @Override
+    public boolean isDistinct() {
+        throw new RuntimeException("POPackageLite does not support isDistinct operation");
+    }
+
+    /**
+     * @param distinct the distinct to set
+     */
+    @Override
+    public void setDistinct(boolean distinct) {
+        throw new RuntimeException("POPackageLite does not support setDistinct operation");
+    }
+
+    /**
+     * @return the isKeyTuple
+     */
+    public boolean getKeyTuple() {
+        return isKeyTuple;
+    }
+
+    /**
+     * @return the keyAsTuple
+     */
+    public Tuple getKeyAsTuple() {
+        return keyAsTuple;
+    }
+
+    /**
+     * @return the tupIter
+     */
+    public Iterator<NullableTuple> getTupIter() {
+        return tupIter;
+    }
+
+    /**
+     * @return the key
+     */
+    public Object getKey() {
+        return key;
+    }
+
+    /**
+     * Similar to POPackage.getNext except that
+     * only one input is expected with index 0 
+     * and ReadOnceBag is used instead of 
+     * DefaultDataBag.
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Tuple res;
+        //Create numInputs bags
+        ReadOnceBag db = null;
+        db = new ReadOnceBag(this, tupIter, key);
+        if(reporter!=null) reporter.progress();
+        
+        //Construct the output tuple by appending
+        //the key and all the above constructed bags
+        //and return it.
+        res = mTupleFactory.newTuple(numInputs+1);
+        res.set(0,key);
+        res.set(1,db);
+        detachInput();
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+    }
+    
+    /**
+     * Makes use of the superclass method, but this requires 
+     * an additional parameter key passed by ReadOnceBag.
+     * key of this instance will be set to null in detachInput 
+     * call, but an instance of ReadOnceBag may have the original 
+     * key that it uses. Therefore this extra argument is taken
+     * to temporarily set it before the call to the superclass method 
+     * and then restore it.  
+     */
+    public Tuple getValueTuple(NullableTuple ntup, int index, Object key) throws ExecException {
+        Object origKey = this.key;
+        this.key = key;
+        Tuple retTuple = super.getValueTuple(ntup, index);
+        this.key = origKey;
+        return retTuple;
+    }
+
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=780143&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Fri May 29 23:51:53 2009
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This bag is specifically created for use by POPackageLite. So it has three 
+ * properties, the NullableTuple iterator, the key (Object) and the keyInfo 
+ * (Map<Integer, Pair<Boolean, Map<Integer, Integer>>>) all three 
+ * of which are required in the constructor call. This bag does not store 
+ * the tuples in memory, but has access to an iterator typically provided by 
+ * Hadoop. Use this when you already have an iterator over tuples and do not 
+ * want to copy over again to a new bag.
+ */
+public class ReadOnceBag implements DataBag {
+
+    // The Package operator that created this
+    POPackageLite pkg;
+    
+    //The iterator of Tuples. Marked transient because we will never serialize this.
+    transient Iterator<NullableTuple> tupIter;
+    
+    // The key being worked on
+    Object key;
+
+    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
+    /**
+     * This constructor creates a bag out of an existing iterator
+     * of tuples by taking ownership of the iterator and NOT
+     * copying the elements of the iterator.
+     * @param pkg POPackageLite
+     * @param tupIter Iterator<NullableTuple>
+     * @param key Object
+     */
+    public ReadOnceBag(POPackageLite pkg, Iterator<NullableTuple> tupIter, Object key) {
+        this.pkg = pkg;
+        this.tupIter = tupIter;
+        this.key = key;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.util.Spillable#getMemorySize()
+     */
+    @Override
+    public long getMemorySize() {
+        throw new RuntimeException("ReadOnceBag does not support getMemorySize operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.util.Spillable#spill()
+  
+     */
+    @Override
+    public long spill() {
+        throw new RuntimeException("ReadOnceBag does not support spill operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#add(org.apache.pig.data.Tuple)
+     */
+    @Override
+    public void add(Tuple t) {
+        throw new RuntimeException("ReadOnceBag does not support add operation");		
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#addAll(org.apache.pig.data.DataBag)
+     */
+    @Override
+    public void addAll(DataBag b) {
+        throw new RuntimeException("ReadOnceBag does not support addAll operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#clear()
+     */
+    @Override
+    public void clear() {
+        throw new RuntimeException("ReadOnceBag does not support clear operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#isDistinct()
+     */
+    @Override
+    public boolean isDistinct() {
+        throw new RuntimeException("ReadOnceBag does not support isDistinct operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#isSorted()
+     */
+    @Override
+    public boolean isSorted() {
+        throw new RuntimeException("ReadOnceBag does not support isSorted operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#iterator()
+     */
+    @Override
+    public Iterator<Tuple> iterator() {
+        return new ReadOnceBagIterator();
+    }
+
+    /* (non-Javadoc)
+	 * @see org.apache.pig.data.DataBag#markStale(boolean)
+     */
+    @Override
+    public void markStale(boolean stale) {
+        throw new RuntimeException("ReadOnceBag does not support markStale operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#size()
+     */
+    @Override
+    public long size() {
+        throw new RuntimeException("ReadOnceBag does not support size operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+     */
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        throw new RuntimeException("ReadOnceBag does not support readFields operation");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        int errCode = 2142;
+        String msg = "ReadOnceBag should never be serialized.";
+        throw new ExecException(msg, errCode, PigException.BUG);
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     * This has to be defined since DataBag implements 
+     * Comparable although, in this case we cannot really compare.
+     */
+    @Override
+    public int compareTo(Object o) {
+        throw new RuntimeException("ReadOnceBags cannot be compared");
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if(other instanceof ReadOnceBag)
+        {
+            if(pkg.getKeyTuple())
+            {
+                if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKeyTuple() == ((ReadOnceBag)other).pkg.getKeyTuple() && pkg.getKeyAsTuple().equals(((ReadOnceBag)other).pkg.getKeyAsTuple()))
+                {
+                    return true;
+                }
+                else
+                {
+                    return false;
+                }
+            }
+            else
+            {
+                if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKey().equals(((ReadOnceBag)other).pkg.getKey()))
+                {
+                    return true;
+                }
+                else
+                {
+                    return false;
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+    	int hash = 7;
+        if(pkg.getKeyTuple())
+        {
+            hash = hash*31 + pkg.getKeyAsTuple().hashCode();
+        }
+        else
+        {
+        	hash = hash*31 + pkg.getKey().hashCode();
+        }
+        return hash;
+    }
+
+    class ReadOnceBagIterator implements Iterator<Tuple>
+    {
+        /* (non-Javadoc)
+         * @see java.util.Iterator#hasNext()
+         */
+        @Override
+        public boolean hasNext() {
+            return tupIter.hasNext();
+        }
+
+        /* (non-Javadoc)
+         * @see java.util.Iterator#next()
+         */
+        @Override
+        public Tuple next() {
+            NullableTuple ntup = tupIter.next();
+            int index = ntup.getIndex();
+            Tuple ret = null;
+            try {
+                ret = pkg.getValueTuple(ntup, index, key);
+            } catch (ExecException e)
+            {
+            	throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
+            }
+            return ret;
+        }
+		
+        /* (non-Javadoc)
+         * @see java.util.Iterator#remove()
+         */
+        @Override
+        public void remove() {
+            throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");    
+        }
+	}
+}
+