You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/18 13:37:56 UTC

svn commit: r1779325 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/ src/org/apache/pig/data/ src/org/apache/pig/impl/io/

Author: rohini
Date: Wed Jan 18 13:37:56 2017
New Revision: 1779325

URL: http://svn.apache.org/viewvc?rev=1779325&view=rev
Log:
PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
    pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 18 13:37:56 2017
@@ -73,7 +73,9 @@ OPTIMIZATIONS
  
 BUG FIXES
 
-PIG-5087 e2e Native3 failing after PIG-4923 (knoguchi)
+PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)
+
+PIG-5087: e2e Native3 failing after PIG-4923 (knoguchi)
 
 PIG-5073: Skip e2e Limit_5 test for Tez (knoguchi)
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Wed Jan 18 13:37:56 2017
@@ -49,7 +49,7 @@ public class CombinerPackager extends Pa
     private Map<Integer, Integer> keyLookup;
 
     private int numBags;
-    
+
     private transient boolean initialized;
     private transient boolean useDefaultBag;
 
@@ -77,6 +77,15 @@ public class CombinerPackager extends Pa
         }
     }
 
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
     /**
      * @param keyInfo the keyInfo to set
      */

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Wed Jan 18 13:37:56 2017
@@ -17,7 +17,7 @@
  */
 
 /**
- * 
+ *
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +49,15 @@ public class LitePackager extends Packag
     private PigNullableWritable keyWritable;
 
     @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
     public boolean[] getInner() {
         return null;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Wed Jan 18 13:37:56 2017
@@ -34,12 +34,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -64,6 +68,7 @@ public class POShuffleTezLoad extends PO
     private transient WritableComparator groupingComparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
+    private transient boolean readOnceOneBag;
 
     public POShuffleTezLoad(POPackage pack) {
         super(pack);
@@ -123,6 +128,11 @@ public class POShuffleTezLoad extends PO
             for (int i = 0; i < numTezInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
+
+            this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+            if (readOnceOneBag) {
+                readOnce[0] = true;
+            }
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -193,43 +203,47 @@ public class POShuffleTezLoad extends PO
 
                 } else {
 
-                    for (int i = 0; i < numInputs; i++) {
-                        bags[i] = new InternalCachedBag(numInputs);
-                    }
-
-                    if (numTezInputs == 1) {
-                        do {
-                            Iterable<Object> vals = readers.get(0).getCurrentValues();
-                            for (Object val : vals) {
-                                NullableTuple nTup = (NullableTuple) val;
-                                int index = nTup.getIndex();
-                                Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                bags[index].add(tup);
-                            }
-                            finished[0] = !readers.get(0).next();
-                            if (finished[0]) {
-                                break;
-                            }
-                            cur = readers.get(0).getCurrentKey();
-                        } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                    if (readOnceOneBag) {
+                        bags[0] = new TezReadOnceBag(pkgr, min);
                     } else {
-                        for (int i = 0; i < numTezInputs; i++) {
-                            if (!finished[i]) {
-                                cur = readers.get(i).getCurrentKey();
-                                // We need to loop in case of Grouping Comparators
-                                while (groupingComparator.compare(min, cur) == 0) {
-                                    Iterable<Object> vals = readers.get(i).getCurrentValues();
-                                    for (Object val : vals) {
-                                        NullableTuple nTup = (NullableTuple) val;
-                                        int index = nTup.getIndex();
-                                        Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                        bags[index].add(tup);
-                                    }
-                                    finished[i] = !readers.get(i).next();
-                                    if (finished[i]) {
-                                        break;
-                                    }
+                        for (int i = 0; i < numInputs; i++) {
+                            bags[i] = new InternalCachedBag(numInputs);
+                        }
+
+                        if (numTezInputs == 1) {
+                            do {
+                                Iterable<Object> vals = readers.get(0).getCurrentValues();
+                                for (Object val : vals) {
+                                    NullableTuple nTup = (NullableTuple) val;
+                                    int index = nTup.getIndex();
+                                    Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                    bags[index].add(tup);
+                                }
+                                finished[0] = !readers.get(0).next();
+                                if (finished[0]) {
+                                    break;
+                                }
+                                cur = readers.get(0).getCurrentKey();
+                            } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                        } else {
+                            for (int i = 0; i < numTezInputs; i++) {
+                                if (!finished[i]) {
                                     cur = readers.get(i).getCurrentKey();
+                                    // We need to loop in case of Grouping Comparators
+                                    while (groupingComparator.compare(min, cur) == 0) {
+                                        Iterable<Object> vals = readers.get(i).getCurrentValues();
+                                        for (Object val : vals) {
+                                            NullableTuple nTup = (NullableTuple) val;
+                                            int index = nTup.getIndex();
+                                            Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                            bags[index].add(tup);
+                                        }
+                                        finished[i] = !readers.get(i).next();
+                                        if (finished[i]) {
+                                            break;
+                                        }
+                                        cur = readers.get(i).getCurrentKey();
+                                    }
                                 }
                             }
                         }
@@ -389,4 +403,74 @@ public class POShuffleTezLoad extends PO
 
     }
 
+    private class TezReadOnceBag extends ReadOnceBag {
+
+        private static final long serialVersionUID = 1L;
+        private Iterator<Object> iter;
+
+        public TezReadOnceBag(Packager pkgr,
+                PigNullableWritable currentKey) throws IOException {
+            this.pkgr = pkgr;
+            this.keyWritable = currentKey;
+            this.iter = readers.get(0).getCurrentValues().iterator();
+        }
+
+        @Override
+        public Iterator<Tuple> iterator() {
+            return new TezReadOnceBagIterator();
+        }
+
+        private class TezReadOnceBagIterator implements Iterator<Tuple> {
+
+            @Override
+            public boolean hasNext() {
+                if (iter.hasNext()) {
+                    return true;
+                } else {
+                    try {
+                        finished[0] = !readers.get(0).next();
+                        if (finished[0]) {
+                            return false;
+                        }
+                        // Currently combiner is not being applied when secondary key(grouping comparator) is used
+                        // But might change in future. So check if the next key is same and return its values
+                        Object cur = readers.get(0).getCurrentKey();
+                        if (groupingComparator.compare(keyWritable, cur) == 0) {
+                            iter = readers.get(0).getCurrentValues().iterator();
+                            // Key should at least have one value. But doing a check just for safety
+                            if (iter.hasNext()) {
+                                return true;
+                            } else {
+                                throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
+                            }
+                        }
+                        return false;
+                    } catch (IOException e) {
+                        throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+                    }
+                }
+            }
+
+            @Override
+            public Tuple next() {
+                NullableTuple ntup = (NullableTuple) iter.next();
+                int index = ntup.getIndex();
+                Tuple ret = null;
+                try {
+                    ret = pkgr.getValueTuple(keyWritable, ntup, index);
+                } catch (ExecException e) {
+                    throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+                }
+                return ret;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+            }
+        }
+
+    }
+
+
 }

Modified: pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Wed Jan 18 13:37:56 2017
@@ -50,6 +50,9 @@ public class ReadOnceBag implements Data
      */
     private static final long serialVersionUID = 2L;
 
+    public ReadOnceBag() {
+    }
+
     /**
      * This constructor creates a bag out of an existing iterator
      * of tuples by taking ownership of the iterator and NOT

Modified: pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java Wed Jan 18 13:37:56 2017
@@ -57,6 +57,8 @@ public class NullableTuple extends PigNu
     public void readFields(DataInput in) throws IOException {
         boolean nullness = in.readBoolean();
         setNull(nullness);
+        // Free up the previous value for GC
+        mValue = null;
         if (!nullness) {
             mValue = bis.readTuple(in);
         }