You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/13 19:44:54 UTC
svn commit: r1724477 - in
/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine:
physicalLayer/ physicalLayer/relationalOperators/ tez/plan/operator/
Author: rohini
Date: Wed Jan 13 18:44:54 2016
New Revision: 1724477
URL: http://svn.apache.org/viewvc?rev=1724477&view=rev
Log:
Fix test failures for PIG-4737
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Jan 13 18:44:54 2016
@@ -407,7 +407,7 @@ public abstract class PhysicalOperator e
public Result getNextDataBag() throws ExecException {
Result val = new Result();
- DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
+ DataBag tmpBag = mBagFactory.newDefaultBag();
for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
if (ret.returnStatus == POStatus.STATUS_ERR) {
return ret;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Wed Jan 13 18:44:54 2016
@@ -238,19 +238,17 @@ public class POPartialAgg extends Physic
doSpill = false;
doContingentSpill = false;
}
- if (result.returnStatus != POStatus.STATUS_EOP
- || inputsExhausted) {
+ if (result.returnStatus != POStatus.STATUS_EOP) {
+ return result;
+ } else if (inputsExhausted) {
+ freeMemory();
return result;
}
}
if (mapAggDisabled()) {
// disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
// if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
- if (rawInputMap != null) {
- // Free up the maps for garbage collection
- rawInputMap = null;
- processedInputMap = null;
- }
+ freeMemory();
return processInput();
} else {
Result inp = processInput();
@@ -292,6 +290,15 @@ public class POPartialAgg extends Physic
}
}
+ private void freeMemory() throws ExecException {
+ if (rawInputMap != null && !rawInputMap.isEmpty()) {
+ throw new ExecException("Illegal state. Trying to free up partial aggregation maps when they are not empty");
+ }
+ // Free up the maps for garbage collection
+ rawInputMap = null;
+ processedInputMap = null;
+ }
+
private void estimateMemThresholds() {
if (!mapAggDisabled()) {
LOG.info("Getting mem limits; considering " + ALL_POPARTS.size()
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Wed Jan 13 18:44:54 2016
@@ -254,10 +254,10 @@ public class POSort extends PhysicalOper
@Override
public Result getNextTuple() throws ExecException {
- Result res = new Result();
+ Result inp;
if (!inputsAccumulated) {
- res = processInput();
+ inp = processInput();
if (!initialized) {
initialized = true;
if (PigMapReduce.sJobConfInternal.get() != null) {
@@ -272,23 +272,25 @@ public class POSort extends PhysicalOper
sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
: new InternalSortedBag(3, mComparator);
- while (res.returnStatus != POStatus.STATUS_EOP) {
- if (res.returnStatus == POStatus.STATUS_ERR) {
+ while (inp.returnStatus != POStatus.STATUS_EOP) {
+ if (inp.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from the inputs");
- return res;
- } else if (res.returnStatus == POStatus.STATUS_NULL) {
+ return inp;
+ } else if (inp.returnStatus == POStatus.STATUS_NULL) {
// Ignore and read the next tuple.
- res = processInput();
+ inp = processInput();
continue;
}
- sortedBag.add((Tuple) res.result);
- res = processInput();
+ sortedBag.add((Tuple) inp.result);
+ inp = processInput();
}
inputsAccumulated = true;
}
- if (it == null) {
+
+ Result res = new Result();
+ if (it == null) {
it = sortedBag.iterator();
}
if (it.hasNext()) {
@@ -299,7 +301,7 @@ public class POSort extends PhysicalOper
res.returnStatus = POStatus.STATUS_EOP;
reset();
}
- return res;
+ return res;
}
@Override
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Wed Jan 13 18:44:54 2016
@@ -137,10 +137,7 @@ public class POSimpleTezLoad extends POL
if (finished) {
return RESULT_EOP;
}
- Result res = new Result();
if (!reader.next()) {
- res.result = null;
- res.returnStatus = POStatus.STATUS_EOP;
// For certain operators (such as STREAM), we could still have some work
// to do even after seeing the last input. These operators set a flag that
// says all input has been sent and to run the pipeline one more time.
@@ -148,15 +145,17 @@ public class POSimpleTezLoad extends POL
this.parentPlan.endOfAllInput = true;
}
finished = true;
+ return RESULT_EOP;
} else {
+ Result res = new Result();
Tuple next = (Tuple) reader.getCurrentValue();
res.result = next;
res.returnStatus = POStatus.STATUS_OK;
if (inputRecordCounter != null) {
inputRecordCounter.increment(1);
}
+ return res;
}
- return res;
} catch (IOException e) {
throw new ExecException(e);
}