You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/15 18:23:09 UTC
svn commit: r1625091 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/u...
Author: rohini
Date: Mon Sep 15 16:23:09 2014
New Revision: 1625091
URL: http://svn.apache.org/r1625091
Log:
PIG-4104: Accumulator UDF throws OOM in Tez (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 15 16:23:09 2014
@@ -70,6 +70,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4104: Accumulator UDF throws OOM in Tez (rohini)
+
PIG-4169: NPE in ConstantCalculator (cheolsoo)
PIG-4161: check for latest Hive snapshot dependencies (daijy)
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Sep 15 16:23:09 2014
@@ -35,6 +35,9 @@ public class PigConfiguration {
*/
public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
+
+ public static final String ACCUMULATIVE_BATCHSIZE = "pig.accumulative.batchsize";
+
/**
* Controls whether partial aggregation is turned on
*/
@@ -276,7 +279,7 @@ public class PigConfiguration {
* This key is used to turns off use of task reports in job statistics.
*/
public static final String PIG_NO_TASK_REPORT = "pig.stats.notaskreport";
-
+
public static final String PIG_CROSS_PARALLELISM_HINT = "pig.cross.parallelism.hint";
public static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Mon Sep 15 16:23:09 2014
@@ -274,8 +274,9 @@ public class POForEach extends PhysicalO
throw new ExecException(e);
}
}else{
- inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
- // buffer.clear();
+ if (buffer instanceof POPackage.POPackageTupleBuffer) {
+ inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+ }
setAccumEnd();
}
@@ -293,7 +294,7 @@ public class POForEach extends PhysicalO
break;
}
}
-
+ buffer.clear();
} else {
res = processPlan();
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Sep 15 16:23:09 2014
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -85,6 +86,8 @@ public class POPackage extends PhysicalO
protected PigNullableWritable keyWritable;
+ private transient int accumulativeBatchSize;
+
public POPackage(OperatorKey k) {
this(k, -1, null);
}
@@ -198,6 +201,8 @@ public class POPackage extends PhysicalO
useDefaultBag = true;
}
}
+ accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
+
// If multiquery, the last bag is InternalCachedBag and should not
// set ReadOnly flag, otherwise we will materialize again to another
// InternalCachedBag
@@ -220,9 +225,7 @@ public class POPackage extends PhysicalO
// create bag wrapper to pull tuples in many batches
// all bags have reference to the sample tuples buffer
// which contains tuples from one batch
- POPackageTupleBuffer buffer = new POPackageTupleBuffer();
- buffer.setKey(key);
- buffer.setIterator(tupIter);
+ POPackageTupleBuffer buffer = new POPackageTupleBuffer(accumulativeBatchSize, key, tupIter);
for (int i = 0; i < numInputs; i++) {
dbs[i] = new AccumulativeBag(buffer, i);
}
@@ -317,29 +320,16 @@ public class POPackage extends PhysicalO
private Object currKey;
@SuppressWarnings("unchecked")
- public POPackageTupleBuffer() {
- batchSize = 20000;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
- if (size != null) {
- batchSize = Integer.parseInt(size);
- }
- }
-
+ public POPackageTupleBuffer(int batchSize, Object key, Iterator<NullableTuple> iter) {
+ this.batchSize = batchSize;
+ this.currKey = key;
+ this.iter = iter;
this.bags = new List[numInputs];
for(int i=0; i<numInputs; i++) {
- this.bags[i] = new ArrayList<Tuple>();
+ this.bags[i] = new ArrayList<Tuple>(batchSize);
}
}
- public void setKey(Object key) {
- this.currKey = key;
- }
-
- public void setIterator(Iterator<NullableTuple> iter) {
- this.iter = iter;
- }
-
@Override
public boolean hasNextBatch() {
return iter.hasNext();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Mon Sep 15 16:23:09 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,7 +31,9 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
@@ -56,6 +59,7 @@ public class POShuffleTezLoad extends PO
private boolean isSkewedJoin = false;
private transient Configuration conf;
+ private transient int accumulativeBatchSize;
public POShuffleTezLoad(POPackage pack) {
super(pack);
@@ -107,11 +111,17 @@ public class POShuffleTezLoad extends PO
} catch (Exception e) {
throw new ExecException(e);
}
+ accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
}
@Override
public Result getNextTuple() throws ExecException {
Result res = pkgr.getNext();
+ TezAccumulativeTupleBuffer buffer = null;
+
+ if (isAccumulative()) {
+ buffer = new TezAccumulativeTupleBuffer(accumulativeBatchSize);
+ }
while (res.returnStatus == POStatus.STATUS_EOP) {
boolean hasData = false;
@@ -148,46 +158,28 @@ public class POShuffleTezLoad extends PO
key = pkgr.getKey(min);
keyWritable = min;
- DataBag[] bags = new DataBag[numInputs];
- POPackageTupleBuffer buffer = new POPackageTupleBuffer();
- List<NullableTuple> nTups = new ArrayList<NullableTuple>();
-
try {
- for (int i = 0; i < numInputs; i++) {
+ DataBag[] bags = new DataBag[numInputs];
+ if (isAccumulative()) {
- DataBag bag = null;
+ buffer.setCurrentKey(min);
+ buffer.setCurrentKeyIndex(minIndex);
+ for (int i = 0; i < numInputs; i++) {
+ bags[i] = new AccumulativeBag(buffer, i);
+ }
- if (!finished[i]) {
- cur = readers.get(i).getCurrentKey();
- // We need to loop in case of Grouping Comparators
- while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
- min.isNull() && i==minIndex)) {
- Iterable<Object> vals = readers.get(i).getCurrentValues();
- if (isAccumulative()) {
- // TODO: POPackageTupleBuffer expects the
- // iterator for all the values from 1st to ith
- // inputs. Ideally, we should directly pass
- // iterators returned by getCurrentValues()
- // instead of copying objects. But if we pass
- // iterators directly, reuse of iterators causes
- // a tez runtime error. For now, we copy objects
- // into a new list and pass the iterator of this
- // new list.
- for (Object val : vals) {
- // Make a copy of key and val and avoid reference.
- // getCurrentKey() or value iterator resets value
- // on the same object by calling readFields() again.
- nTups.add(new NullableTuple((NullableTuple) val));
- }
- // Attach input to POPackageTupleBuffer
- buffer.setIterator(nTups.iterator());
- if(bags[i] == null) {
- buffer.setKey(cur);
- bag = new AccumulativeBag(buffer, i);
- } else {
- bag = bags[i];
- }
- } else {
+ } else {
+
+ for (int i = 0; i < numInputs; i++) {
+
+ DataBag bag = null;
+
+ if (!finished[i]) {
+ cur = readers.get(i).getCurrentKey();
+ // We need to loop in case of Grouping Comparators
+ while (comparator.compare(min, cur) == 0
+ && (!min.isNull() || (min.isNull() && i == minIndex))) {
+ Iterable<Object> vals = readers.get(i).getCurrentValues();
bag = bags[i] == null ? new InternalCachedBag(numInputs) : bags[i];
for (Object val : vals) {
NullableTuple nTup = (NullableTuple) val;
@@ -195,31 +187,29 @@ public class POShuffleTezLoad extends PO
Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
bag.add(tup);
}
+ bags[i] = bag;
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
}
- bags[i] = bag;
- finished[i] = !readers.get(i).next();
- if (finished[i]) {
- break;
- }
- cur = readers.get(i).getCurrentKey();
}
- }
- if (bag == null) {
- if (isAccumulative()) {
- bags[i] = new AccumulativeBag(buffer, i);
- } else {
+ if (bag == null) {
bags[i] = new InternalCachedBag(numInputs);
}
}
}
+
+ pkgr.attachInput(key, bags, readOnce);
+ res = pkgr.getNext();
+
} catch (IOException e) {
throw new ExecException(e);
}
-
- pkgr.attachInput(key, bags, readOnce);
- res = pkgr.getNext();
}
+
return res;
}
@@ -244,4 +234,131 @@ public class POShuffleTezLoad extends PO
return true;
}
+ private class TezAccumulativeTupleBuffer implements AccumulativeTupleBuffer {
+
+ private int batchSize;
+ private List<Tuple>[] bags;
+ private PigNullableWritable min;
+ private int minIndex;
+ private boolean clearedCurrent = true;
+
+ @SuppressWarnings("unchecked")
+ public TezAccumulativeTupleBuffer(int batchSize) {
+ this.batchSize = batchSize;
+ this.bags = new List[numInputs];
+ for (int i = 0; i < numInputs; i++) {
+ this.bags[i] = new ArrayList<Tuple>(batchSize);
+ }
+ }
+
+ public void setCurrentKey(PigNullableWritable curKey) {
+ if (!clearedCurrent) {
+ // If buffer.clear() is not called from POForEach ensure it is called here.
+ clear();
+ }
+ this.min = curKey;
+ clearedCurrent = false;
+ }
+
+ public void setCurrentKeyIndex(int curKeyIndex) {
+ this.minIndex = curKeyIndex;
+ }
+
+ @Override
+ public boolean hasNextBatch() {
+ Object cur = null;
+ try {
+ for (int i = 0; i < numInputs; i++) {
+ if (!finished[i]) {
+ cur = readers.get(i).getCurrentKey();
+ if (comparator.compare(min, cur) == 0
+ && (!min.isNull() || (min.isNull() && i == minIndex))) {
+ return true;
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Error while checking for next Accumulator batch", e);
+ }
+ return false;
+ }
+
+ @Override
+ public void nextBatch() throws IOException {
+ Object cur = null;
+ for (int i = 0; i < bags.length; i++) {
+ bags[i].clear();
+ }
+ try {
+ for (int i = 0; i < numInputs; i++) {
+ if (!finished[i]) {
+ cur = readers.get(i).getCurrentKey();
+ int batchCount = 0;
+ while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
+ min.isNull() && i==minIndex)) {
+ Iterator<Object> iter = readers.get(i).getCurrentValues().iterator();
+ while (iter.hasNext() && batchCount < batchSize) {
+ bags[i].add(pkgr.getValueTuple(keyWritable, (NullableTuple) iter.next(), i));
+ batchCount++;
+ }
+ if (batchCount == batchSize) {
+ if (!iter.hasNext()) {
+ // Move to next key and update finished
+ finished[i] = !readers.get(i).next();
+ }
+ break;
+ }
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Error while reading next Accumulator batch", e);
+ }
+ }
+
+ @Override
+ public void clear() {
+ for (int i = 0; i < bags.length; i++) {
+ bags[i].clear();
+ }
+ // Skip through current keys and its values not processed because of
+ // early termination of accumulator
+ Object cur = null;
+ try {
+ for (int i = 0; i < numInputs; i++) {
+ if (!finished[i]) {
+ cur = readers.get(i).getCurrentKey();
+ while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
+ min.isNull() && i==minIndex)) {
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Error while cleaning up for next Accumulator batch", e);
+ }
+ clearedCurrent = true;
+ }
+
+ @Override
+ public Iterator<Tuple> getTuples(int index) {
+ return bags[index].iterator();
+ }
+
+ //TODO: illustratorMarkup
+
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1625091&r1=1625090&r2=1625091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Mon Sep 15 16:23:09 2014
@@ -5,6 +5,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.Accumulator;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -27,6 +29,17 @@ import org.apache.pig.impl.PigContext;
public class AccumulatorOptimizerUtil {
private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class);
+ public static int getAccumulativeBatchSize() {
+ int batchSize = 20000;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String size = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.ACCUMULATIVE_BATCHSIZE);
+ if (size != null) {
+ batchSize = Integer.parseInt(size);
+ }
+ }
+ return batchSize;
+ }
+
public static void addAccumulator(PhysicalPlan plan) {
// See if this is a map-reduce job
List<PhysicalOperator> pos = plan.getRoots();