You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/15 18:23:09 UTC

svn commit: r1625091 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/u...

Author: rohini
Date: Mon Sep 15 16:23:09 2014
New Revision: 1625091

URL: http://svn.apache.org/r1625091
Log:
PIG-4104: Accumulator UDF throws OOM in Tez (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 15 16:23:09 2014
@@ -70,6 +70,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4104: Accumulator UDF throws OOM in Tez (rohini)
+
 PIG-4169: NPE in ConstantCalculator (cheolsoo)
 
 PIG-4161: check for latest Hive snapshot dependencies (daijy)

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Sep 15 16:23:09 2014
@@ -35,6 +35,9 @@ public class PigConfiguration {
      */
     public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
 
+
+    public static final String ACCUMULATIVE_BATCHSIZE = "pig.accumulative.batchsize";
+
     /**
      * Controls whether partial aggregation is turned on
      */
@@ -276,7 +279,7 @@ public class PigConfiguration {
      * This key is used to turns off use of task reports in job statistics.
      */
     public static final String PIG_NO_TASK_REPORT = "pig.stats.notaskreport";
-    
+
     public static final String PIG_CROSS_PARALLELISM_HINT = "pig.cross.parallelism.hint";
 
     public static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Mon Sep 15 16:23:09 2014
@@ -274,8 +274,9 @@ public class POForEach extends PhysicalO
                                 throw new ExecException(e);
                             }
                         }else{
-                            inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
-                            //                       buffer.clear();
+                            if (buffer instanceof POPackage.POPackageTupleBuffer) {
+                                inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+                            }
                             setAccumEnd();
                         }
 
@@ -293,7 +294,7 @@ public class POForEach extends PhysicalO
                             break;
                         }
                     }
-
+                    buffer.clear();
                 } else {
                     res = processPlan();
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Sep 15 16:23:09 2014
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -85,6 +86,8 @@ public class POPackage extends PhysicalO
 
     protected PigNullableWritable keyWritable;
 
+    private transient int accumulativeBatchSize;
+
     public POPackage(OperatorKey k) {
         this(k, -1, null);
     }
@@ -198,6 +201,8 @@ public class POPackage extends PhysicalO
                     useDefaultBag = true;
                 }
             }
+            accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
+
             // If multiquery, the last bag is InternalCachedBag and should not
             // set ReadOnly flag, otherwise we will materialize again to another
             // InternalCachedBag
@@ -220,9 +225,7 @@ public class POPackage extends PhysicalO
                 // create bag wrapper to pull tuples in many batches
                 // all bags have reference to the sample tuples buffer
                 // which contains tuples from one batch
-                POPackageTupleBuffer buffer = new POPackageTupleBuffer();
-                buffer.setKey(key);
-                buffer.setIterator(tupIter);
+                POPackageTupleBuffer buffer = new POPackageTupleBuffer(accumulativeBatchSize, key, tupIter);
                 for (int i = 0; i < numInputs; i++) {
                     dbs[i] = new AccumulativeBag(buffer, i);
                 }
@@ -317,29 +320,16 @@ public class POPackage extends PhysicalO
         private Object currKey;
 
         @SuppressWarnings("unchecked")
-        public POPackageTupleBuffer() {
-            batchSize = 20000;
-            if (PigMapReduce.sJobConfInternal.get() != null) {
-                String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
-                if (size != null) {
-                    batchSize = Integer.parseInt(size);
-                }
-            }
-
+        public POPackageTupleBuffer(int batchSize, Object key, Iterator<NullableTuple> iter) {
+            this.batchSize = batchSize;
+            this.currKey = key;
+            this.iter = iter;
             this.bags = new List[numInputs];
             for(int i=0; i<numInputs; i++) {
-                this.bags[i] = new ArrayList<Tuple>();
+                this.bags[i] = new ArrayList<Tuple>(batchSize);
             }
         }
 
-        public void setKey(Object key) {
-            this.currKey = key;
-        }
-
-        public void setIterator(Iterator<NullableTuple> iter) {
-            this.iter = iter;
-        }
-
         @Override
         public boolean hasNextBatch() {
             return iter.hasNext();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Mon Sep 15 16:23:09 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,7 +31,9 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalCachedBag;
@@ -56,6 +59,7 @@ public class POShuffleTezLoad extends PO
     private boolean isSkewedJoin = false;
 
     private transient Configuration conf;
+    private transient int accumulativeBatchSize;
 
     public POShuffleTezLoad(POPackage pack) {
         super(pack);
@@ -107,11 +111,17 @@ public class POShuffleTezLoad extends PO
         } catch (Exception e) {
             throw new ExecException(e);
         }
+        accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
     }
 
     @Override
     public Result getNextTuple() throws ExecException {
         Result res = pkgr.getNext();
+        TezAccumulativeTupleBuffer buffer = null;
+
+        if (isAccumulative()) {
+            buffer = new TezAccumulativeTupleBuffer(accumulativeBatchSize);
+        }
 
         while (res.returnStatus == POStatus.STATUS_EOP) {
             boolean hasData = false;
@@ -148,46 +158,28 @@ public class POShuffleTezLoad extends PO
             key = pkgr.getKey(min);
             keyWritable = min;
 
-            DataBag[] bags = new DataBag[numInputs];
-            POPackageTupleBuffer buffer = new POPackageTupleBuffer();
-            List<NullableTuple> nTups = new ArrayList<NullableTuple>();
-
             try {
-                for (int i = 0; i < numInputs; i++) {
+                DataBag[] bags = new DataBag[numInputs];
+                if (isAccumulative()) {
 
-                    DataBag bag = null;
+                    buffer.setCurrentKey(min);
+                    buffer.setCurrentKeyIndex(minIndex);
+                    for (int i = 0; i < numInputs; i++) {
+                        bags[i] = new AccumulativeBag(buffer, i);
+                    }
 
-                    if (!finished[i]) {
-                        cur = readers.get(i).getCurrentKey();
-                        // We need to loop in case of Grouping Comparators
-                        while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
-                                min.isNull() && i==minIndex)) {
-                            Iterable<Object> vals = readers.get(i).getCurrentValues();
-                            if (isAccumulative()) {
-                                // TODO: POPackageTupleBuffer expects the
-                                // iterator for all the values from 1st to ith
-                                // inputs. Ideally, we should directly pass
-                                // iterators returned by getCurrentValues()
-                                // instead of copying objects. But if we pass
-                                // iterators directly, reuse of iterators causes
-                                // a tez runtime error. For now, we copy objects
-                                // into a new list and pass the iterator of this
-                                // new list.
-                                for (Object val : vals) {
-                                    // Make a copy of key and val and avoid reference.
-                                    // getCurrentKey() or value iterator resets value
-                                    // on the same object by calling readFields() again.
-                                    nTups.add(new NullableTuple((NullableTuple) val));
-                                }
-                                // Attach input to POPackageTupleBuffer
-                                buffer.setIterator(nTups.iterator());
-                                if(bags[i] == null) {
-                                    buffer.setKey(cur);
-                                    bag = new AccumulativeBag(buffer, i);
-                                } else {
-                                    bag = bags[i];
-                                }
-                            } else {
+                } else {
+
+                    for (int i = 0; i < numInputs; i++) {
+
+                        DataBag bag = null;
+
+                        if (!finished[i]) {
+                            cur = readers.get(i).getCurrentKey();
+                            // We need to loop in case of Grouping Comparators
+                            while (comparator.compare(min, cur) == 0
+                                    && (!min.isNull() || (min.isNull() && i == minIndex))) {
+                                Iterable<Object> vals = readers.get(i).getCurrentValues();
                                 bag = bags[i] == null ? new InternalCachedBag(numInputs) : bags[i];
                                 for (Object val : vals) {
                                     NullableTuple nTup = (NullableTuple) val;
@@ -195,31 +187,29 @@ public class POShuffleTezLoad extends PO
                                     Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
                                     bag.add(tup);
                                 }
+                                bags[i] = bag;
+                                finished[i] = !readers.get(i).next();
+                                if (finished[i]) {
+                                    break;
+                                }
+                                cur = readers.get(i).getCurrentKey();
                             }
-                            bags[i] = bag;
-                            finished[i] = !readers.get(i).next();
-                            if (finished[i]) {
-                                break;
-                            }
-                            cur = readers.get(i).getCurrentKey();
                         }
-                    }
 
-                    if (bag == null) {
-                        if (isAccumulative()) {
-                            bags[i] = new AccumulativeBag(buffer, i);
-                        } else {
+                        if (bag == null) {
                             bags[i] = new InternalCachedBag(numInputs);
                         }
                     }
                 }
+
+                pkgr.attachInput(key, bags, readOnce);
+                res = pkgr.getNext();
+
             } catch (IOException e) {
                 throw new ExecException(e);
             }
-
-            pkgr.attachInput(key, bags, readOnce);
-            res = pkgr.getNext();
         }
+
         return res;
     }
 
@@ -244,4 +234,131 @@ public class POShuffleTezLoad extends PO
         return true;
     }
 
+    private class TezAccumulativeTupleBuffer implements AccumulativeTupleBuffer {
+
+        private int batchSize;
+        private List<Tuple>[] bags;
+        private PigNullableWritable min;
+        private int minIndex;
+        private boolean clearedCurrent = true;
+
+        @SuppressWarnings("unchecked")
+        public TezAccumulativeTupleBuffer(int batchSize) {
+            this.batchSize = batchSize;
+            this.bags = new List[numInputs];
+            for (int i = 0; i < numInputs; i++) {
+                this.bags[i] = new ArrayList<Tuple>(batchSize);
+            }
+        }
+
+        public void setCurrentKey(PigNullableWritable curKey) {
+            if (!clearedCurrent) {
+                // If buffer.clear() is not called from POForEach ensure it is called here.
+                clear();
+            }
+            this.min = curKey;
+            clearedCurrent = false;
+        }
+
+        public void setCurrentKeyIndex(int curKeyIndex) {
+            this.minIndex = curKeyIndex;
+        }
+
+        @Override
+        public boolean hasNextBatch() {
+            Object cur = null;
+            try {
+                for (int i = 0; i < numInputs; i++) {
+                    if (!finished[i]) {
+                        cur = readers.get(i).getCurrentKey();
+                        if (comparator.compare(min, cur) == 0
+                                && (!min.isNull() || (min.isNull() && i == minIndex))) {
+                            return true;
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Error while checking for next Accumulator batch", e);
+            }
+            return false;
+        }
+
+        @Override
+        public void nextBatch() throws IOException {
+            Object cur = null;
+            for (int i = 0; i < bags.length; i++) {
+                bags[i].clear();
+            }
+            try {
+                for (int i = 0; i < numInputs; i++) {
+                    if (!finished[i]) {
+                        cur = readers.get(i).getCurrentKey();
+                        int batchCount = 0;
+                        while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
+                                min.isNull() && i==minIndex)) {
+                            Iterator<Object> iter = readers.get(i).getCurrentValues().iterator();
+                            while (iter.hasNext() && batchCount < batchSize) {
+                                bags[i].add(pkgr.getValueTuple(keyWritable, (NullableTuple) iter.next(), i));
+                                batchCount++;
+                            }
+                            if (batchCount == batchSize) {
+                                if (!iter.hasNext()) {
+                                    // Move to next key and update finished
+                                    finished[i] = !readers.get(i).next();
+                                }
+                                break;
+                            }
+                            finished[i] = !readers.get(i).next();
+                            if (finished[i]) {
+                                break;
+                            }
+                            cur = readers.get(i).getCurrentKey();
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Error while reading next Accumulator batch", e);
+            }
+        }
+
+        @Override
+        public void clear() {
+            for (int i = 0; i < bags.length; i++) {
+                bags[i].clear();
+            }
+            // Skip through current keys and its values not processed because of
+            // early termination of accumulator
+            Object cur = null;
+            try {
+                for (int i = 0; i < numInputs; i++) {
+                    if (!finished[i]) {
+                        cur = readers.get(i).getCurrentKey();
+                        while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
+                                min.isNull() && i==minIndex)) {
+                            finished[i] = !readers.get(i).next();
+                            if (finished[i]) {
+                                break;
+                            }
+                            cur = readers.get(i).getCurrentKey();
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Error while cleaning up for next Accumulator batch", e);
+            }
+            clearedCurrent = true;
+        }
+
+        @Override
+        public Iterator<Tuple> getTuples(int index) {
+            return bags[index].iterator();
+        }
+
+        //TODO: illustratorMarkup
+
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Mon Sep 15 16:23:09 2014
@@ -5,6 +5,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.Accumulator;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -27,6 +29,17 @@ import org.apache.pig.impl.PigContext;
 public class AccumulatorOptimizerUtil {
     private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class);
 
+    public static int getAccumulativeBatchSize() {
+        int batchSize = 20000;
+        if (PigMapReduce.sJobConfInternal.get() != null) {
+            String size = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.ACCUMULATIVE_BATCHSIZE);
+            if (size != null) {
+                batchSize = Integer.parseInt(size);
+            }
+        }
+        return batchSize;
+    }
+
     public static void addAccumulator(PhysicalPlan plan) {
         // See if this is a map-reduce job
         List<PhysicalOperator> pos = plan.getRoots();