You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/13 19:44:54 UTC

svn commit: r1724477 - in /pig/trunk/src/org/apache/pig/backend/hadoop/executionengine: physicalLayer/ physicalLayer/relationalOperators/ tez/plan/operator/

Author: rohini
Date: Wed Jan 13 18:44:54 2016
New Revision: 1724477

URL: http://svn.apache.org/viewvc?rev=1724477&view=rev
Log:
Fix test failures for PIG-4737

Modified:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Jan 13 18:44:54 2016
@@ -407,7 +407,7 @@ public abstract class PhysicalOperator e
 
     public Result getNextDataBag() throws ExecException {
         Result val = new Result();
-        DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
+        DataBag tmpBag = mBagFactory.newDefaultBag();
         for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
             if (ret.returnStatus == POStatus.STATUS_ERR) {
                 return ret;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Wed Jan 13 18:44:54 2016
@@ -238,19 +238,17 @@ public class POPartialAgg extends Physic
                     doSpill = false;
                     doContingentSpill = false;
                 }
-                if (result.returnStatus != POStatus.STATUS_EOP
-                        || inputsExhausted) {
+                if (result.returnStatus != POStatus.STATUS_EOP) {
+                    return result;
+                } else if (inputsExhausted) {
+                    freeMemory();
                     return result;
                 }
             }
             if (mapAggDisabled()) {
                 // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
                 // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
-                if (rawInputMap != null) {
-                    // Free up the maps for garbage collection
-                    rawInputMap = null;
-                    processedInputMap = null;
-                }
+                freeMemory();
                 return processInput();
             } else {
                 Result inp = processInput();
@@ -292,6 +290,15 @@ public class POPartialAgg extends Physic
         }
     }
 
+    private void freeMemory() throws ExecException {
+        if (rawInputMap != null && !rawInputMap.isEmpty()) {
+            throw new ExecException("Illegal state. Trying to free up partial aggregation maps when they are not empty");
+        }
+        // Free up the maps for garbage collection
+        rawInputMap = null;
+        processedInputMap = null;
+    }
+
     private void estimateMemThresholds() {
         if (!mapAggDisabled()) {
             LOG.info("Getting mem limits; considering " + ALL_POPARTS.size()

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Wed Jan 13 18:44:54 2016
@@ -254,10 +254,10 @@ public class POSort extends PhysicalOper
 
 	@Override
 	public Result getNextTuple() throws ExecException {
-		Result res = new Result();
+		Result inp;
 
 		if (!inputsAccumulated) {
-			res = processInput();
+			inp = processInput();
             if (!initialized) {
                 initialized = true;
                 if (PigMapReduce.sJobConfInternal.get() != null) {
@@ -272,23 +272,25 @@ public class POSort extends PhysicalOper
             sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
                     : new InternalSortedBag(3, mComparator);
 
-            while (res.returnStatus != POStatus.STATUS_EOP) {
-				if (res.returnStatus == POStatus.STATUS_ERR) {
+            while (inp.returnStatus != POStatus.STATUS_EOP) {
+				if (inp.returnStatus == POStatus.STATUS_ERR) {
 					log.error("Error in reading from the inputs");
-					return res;
-                } else if (res.returnStatus == POStatus.STATUS_NULL) {
+					return inp;
+                } else if (inp.returnStatus == POStatus.STATUS_NULL) {
                     // Ignore and read the next tuple.
-                    res = processInput();
+                    inp = processInput();
                     continue;
                 }
-				sortedBag.add((Tuple) res.result);
-				res = processInput();
+				sortedBag.add((Tuple) inp.result);
+				inp = processInput();
             }
 
 			inputsAccumulated = true;
 
 		}
-		if (it == null) {
+
+        Result res = new Result();
+        if (it == null) {
             it = sortedBag.iterator();
         }
         if (it.hasNext()) {
@@ -299,7 +301,7 @@ public class POSort extends PhysicalOper
             res.returnStatus = POStatus.STATUS_EOP;
             reset();
         }
-		return res;
+        return res;
 	}
 
 	@Override

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1724477&r1=1724476&r2=1724477&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Wed Jan 13 18:44:54 2016
@@ -137,10 +137,7 @@ public class POSimpleTezLoad extends POL
             if (finished) {
                 return RESULT_EOP;
             }
-            Result res = new Result();
             if (!reader.next()) {
-                res.result = null;
-                res.returnStatus = POStatus.STATUS_EOP;
                 // For certain operators (such as STREAM), we could still have some work
                 // to do even after seeing the last input. These operators set a flag that
                 // says all input has been sent and to run the pipeline one more time.
@@ -148,15 +145,17 @@ public class POSimpleTezLoad extends POL
                     this.parentPlan.endOfAllInput = true;
                 }
                 finished = true;
+                return RESULT_EOP;
             } else {
+                Result res = new Result();
                 Tuple next = (Tuple) reader.getCurrentValue();
                 res.result = next;
                 res.returnStatus = POStatus.STATUS_OK;
                 if (inputRecordCounter != null) {
                     inputRecordCounter.increment(1);
                 }
+                return res;
             }
-            return res;
         } catch (IOException e) {
             throw new ExecException(e);
         }