You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/18 13:37:56 UTC
svn commit: r1779325 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/
src/org/apache/pig/data/ src/org/apache/pig/impl/io/
Author: rohini
Date: Wed Jan 18 13:37:56 2017
New Revision: 1779325
URL: http://svn.apache.org/viewvc?rev=1779325&view=rev
Log:
PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 18 13:37:56 2017
@@ -73,7 +73,9 @@ OPTIMIZATIONS
BUG FIXES
-PIG-5087 e2e Native3 failing after PIG-4923 (knoguchi)
+PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)
+
+PIG-5087: e2e Native3 failing after PIG-4923 (knoguchi)
PIG-5073: Skip e2e Limit_5 test for Tez (knoguchi)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Wed Jan 18 13:37:56 2017
@@ -49,7 +49,7 @@ public class CombinerPackager extends Pa
private Map<Integer, Integer> keyLookup;
private int numBags;
-
+
private transient boolean initialized;
private transient boolean useDefaultBag;
@@ -77,6 +77,15 @@ public class CombinerPackager extends Pa
}
}
+ @Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
/**
* @param keyInfo the keyInfo to set
*/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Wed Jan 18 13:37:56 2017
@@ -17,7 +17,7 @@
*/
/**
- *
+ *
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +49,15 @@ public class LitePackager extends Packag
private PigNullableWritable keyWritable;
@Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
+ @Override
public boolean[] getInner() {
return null;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Wed Jan 18 13:37:56 2017
@@ -34,12 +34,16 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -64,6 +68,7 @@ public class POShuffleTezLoad extends PO
private transient WritableComparator groupingComparator = null;
private transient Configuration conf;
private transient int accumulativeBatchSize;
+ private transient boolean readOnceOneBag;
public POShuffleTezLoad(POPackage pack) {
super(pack);
@@ -123,6 +128,11 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numTezInputs; i++) {
finished[i] = !readers.get(i).next();
}
+
+ this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+ if (readOnceOneBag) {
+ readOnce[0] = true;
+ }
} catch (Exception e) {
throw new ExecException(e);
}
@@ -193,43 +203,47 @@ public class POShuffleTezLoad extends PO
} else {
- for (int i = 0; i < numInputs; i++) {
- bags[i] = new InternalCachedBag(numInputs);
- }
-
- if (numTezInputs == 1) {
- do {
- Iterable<Object> vals = readers.get(0).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[0] = !readers.get(0).next();
- if (finished[0]) {
- break;
- }
- cur = readers.get(0).getCurrentKey();
- } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ if (readOnceOneBag) {
+ bags[0] = new TezReadOnceBag(pkgr, min);
} else {
- for (int i = 0; i < numTezInputs; i++) {
- if (!finished[i]) {
- cur = readers.get(i).getCurrentKey();
- // We need to loop in case of Grouping Comparators
- while (groupingComparator.compare(min, cur) == 0) {
- Iterable<Object> vals = readers.get(i).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[i] = !readers.get(i).next();
- if (finished[i]) {
- break;
- }
+ for (int i = 0; i < numInputs; i++) {
+ bags[i] = new InternalCachedBag(numInputs);
+ }
+
+ if (numTezInputs == 1) {
+ do {
+ Iterable<Object> vals = readers.get(0).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ break;
+ }
+ cur = readers.get(0).getCurrentKey();
+ } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ } else {
+ for (int i = 0; i < numTezInputs; i++) {
+ if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
+ // We need to loop in case of Grouping Comparators
+ while (groupingComparator.compare(min, cur) == 0) {
+ Iterable<Object> vals = readers.get(i).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
+ }
}
}
}
@@ -389,4 +403,74 @@ public class POShuffleTezLoad extends PO
}
+ private class TezReadOnceBag extends ReadOnceBag {
+
+ private static final long serialVersionUID = 1L;
+ private Iterator<Object> iter;
+
+ public TezReadOnceBag(Packager pkgr,
+ PigNullableWritable currentKey) throws IOException {
+ this.pkgr = pkgr;
+ this.keyWritable = currentKey;
+ this.iter = readers.get(0).getCurrentValues().iterator();
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new TezReadOnceBagIterator();
+ }
+
+ private class TezReadOnceBagIterator implements Iterator<Tuple> {
+
+ @Override
+ public boolean hasNext() {
+ if (iter.hasNext()) {
+ return true;
+ } else {
+ try {
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ return false;
+ }
+ // Currently combiner is not being applied when secondary key(grouping comparator) is used
+ // But might change in future. So check if the next key is same and return its values
+ Object cur = readers.get(0).getCurrentKey();
+ if (groupingComparator.compare(keyWritable, cur) == 0) {
+ iter = readers.get(0).getCurrentValues().iterator();
+ // Key should at least have one value. But doing a check just for safety
+ if (iter.hasNext()) {
+ return true;
+ } else {
+ throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+ }
+ }
+ }
+
+ @Override
+ public Tuple next() {
+ NullableTuple ntup = (NullableTuple) iter.next();
+ int index = ntup.getIndex();
+ Tuple ret = null;
+ try {
+ ret = pkgr.getValueTuple(keyWritable, ntup, index);
+ } catch (ExecException e) {
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+ }
+ }
+
+ }
+
+
}
Modified: pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Wed Jan 18 13:37:56 2017
@@ -50,6 +50,9 @@ public class ReadOnceBag implements Data
*/
private static final long serialVersionUID = 2L;
+ public ReadOnceBag() {
+ }
+
/**
* This constructor creates a bag out of an existing iterator
* of tuples by taking ownership of the iterator and NOT
Modified: pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java?rev=1779325&r1=1779324&r2=1779325&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java Wed Jan 18 13:37:56 2017
@@ -57,6 +57,8 @@ public class NullableTuple extends PigNu
public void readFields(DataInput in) throws IOException {
boolean nullness = in.readBoolean();
setNull(nullness);
+ // Free up the previous value for GC
+ mValue = null;
if (!nullness) {
mValue = bis.readTuple(in);
}