You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/16 01:17:46 UTC
svn commit: r1542432 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/executionengine/physicalLay...
Author: cheolsoo
Date: Sat Nov 16 00:17:45 2013
New Revision: 1542432
URL: http://svn.apache.org/r1542432
Log:
PIG-3568: Define the semantics of POStatus.STATUS_NULL (mwagner via cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
pig/trunk/test/org/apache/pig/test/TestEqualTo.java
pig/trunk/test/org/apache/pig/test/TestGTOrEqual.java
pig/trunk/test/org/apache/pig/test/TestGreaterThan.java
pig/trunk/test/org/apache/pig/test/TestLTOrEqual.java
pig/trunk/test/org/apache/pig/test/TestLessThan.java
pig/trunk/test/org/apache/pig/test/TestNotEqualTo.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Nov 16 00:17:45 2013
@@ -52,6 +52,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3568: Define the semantics of POStatus.STATUS_NULL (mwagner via cheolsoo)
+
PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)
PIG-3553: HadoopJobHistoryLoader fails to load job history on hadoop v 1.2 (lgiri via cheolsoo)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java Sat Nov 16 00:17:45 2013
@@ -17,23 +17,57 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer;
+/**
+ * POStatus is a set of flags used to communicate the status of Pig's operator
+ * pipeline to consumers.
+ */
public class POStatus {
+
+ /**
+ * STATUS_OK indicates that the pull on the operator pipeline resulted in a
+ * valid output.
+ */
public static final byte STATUS_OK = 0;
+ /**
+ * STATUS_NULL indicates that no output was produced, but there may be more
+ * results. This can happen if a value is filtered out or an empty bag is
+ * flattened. A caller will typically ignore the output and try again after
+ * seeing STATUS_NULL.
+ *
+ * This does NOT indicate that the output is the value 'null' (which is
+ * possible in expressions). This is represented as 'null' with STATUS_OK.
+ */
public static final byte STATUS_NULL = 1;
+ /**
+ * STATUS_ERR indicates that there was a problem while trying to produce a
+ * result. This should be remembered and fed back to the user.
+ */
public static final byte STATUS_ERR = 2;
- public static final byte STATUS_EOP = 3; // end of processing
-
- // This is currently only used in communications
- // between ExecutableManager and POStream
- public static final byte STATUS_EOS = 4; // end of Streaming output (i.e. output from streaming binary)
-
- // successfully processing of a batch, used by accumulative UDFs
- // this is used for accumulative UDFs
+ /**
+ * STATUS_EOP indicates that no output was produced, and no further outputs
+ * will be produced (e.g. all attached inputs have been consumed or a limit
+ * has reached its threshold). A caller will typically terminate or attach
+ * new inputs on seeing this status.
+ */
+ public static final byte STATUS_EOP = 3;
+
+ /**
+ * This is currently only used in communications between ExecutableManager
+ * and POStream. It indicates the end of Streaming output (i.e. output from
+ * streaming binary).
+ */
+ public static final byte STATUS_EOS = 4;
+
+ /**
+ * Successful processing of a batch. This is used for accumulative UDFs.
+ */
public static final byte STATUS_BATCH_OK = 5;
- // this signals that an accumulative UDF has already finished
+ /**
+ * This signals that an accumulative UDF has already finished.
+ */
public static final byte STATUS_EARLY_TERMINATION = 6;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Sat Nov 16 00:17:45 2013
@@ -265,28 +265,29 @@ public abstract class PhysicalOperator e
*/
public Result processInput() throws ExecException {
try {
- Result res = new Result();
- if (input == null && (inputs == null || inputs.size()==0)) {
-// log.warn("No inputs found. Signaling End of Processing.");
- res.returnStatus = POStatus.STATUS_EOP;
- return res;
- }
+ Result res = new Result();
+ if (input == null && (inputs == null || inputs.size() == 0)) {
+ // log.warn("No inputs found. Signaling End of Processing.");
+ res.returnStatus = POStatus.STATUS_EOP;
+ return res;
+ }
- //Should be removed once the model is clear
- if(getReporter()!=null) {
- getReporter().progress();
- }
+ // Should be removed once the model is clear
+ if (getReporter() != null) {
+ getReporter().progress();
+ }
- if (!isInputAttached()) {
+ if (!isInputAttached()) {
return inputs.get(0).getNextTuple();
- } else {
- res.result = input;
- res.returnStatus = (res.result == null ? POStatus.STATUS_NULL: POStatus.STATUS_OK);
- detachInput();
- return res;
- }
+ } else {
+ res.result = input;
+ res.returnStatus = POStatus.STATUS_OK;
+ detachInput();
+ return res;
+ }
} catch (ExecException e) {
- throw new ExecException("Exception while executing " + this.toString() + ": " + e.toString(), e);
+ throw new ExecException("Exception while executing "
+ + this.toString() + ": " + e.toString(), e);
}
}
@@ -382,11 +383,14 @@ public abstract class PhysicalOperator e
public Result getNextDataBag() throws ExecException {
Result ret = null;
DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
- for(ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()){
- if(ret.returnStatus == POStatus.STATUS_ERR) {
+ for (ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
+ if (ret.returnStatus == POStatus.STATUS_ERR) {
return ret;
+ } else if (ret.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ } else {
+ tmpBag.add((Tuple) ret.result);
}
- tmpBag.add((Tuple)ret.result);
}
ret.result = tmpBag;
ret.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
@@ -449,15 +453,15 @@ public abstract class PhysicalOperator e
}
public Log getLogger() {
- return log;
+ return log;
}
public static void setPigLogger(PigLogger logger) {
- pigLogger = logger;
+ pigLogger = logger;
}
public static PigLogger getPigLogger() {
- return pigLogger;
+ return pigLogger;
}
public static class OriginalLocation implements Serializable {
@@ -470,7 +474,7 @@ public abstract class PhysicalOperator e
this.alias = alias;
this.line = line;
this.offset = offset;
-}
+ }
public String getAlias() {
return alias;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Sat Nov 16 00:17:45 2013
@@ -110,7 +110,7 @@ public class EqualToExpr extends BinaryC
// null
if(left.result == null || right.result == null) {
left.result = null;
- left.returnStatus = POStatus.STATUS_NULL;
+ left.returnStatus = POStatus.STATUS_OK;
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Sat Nov 16 00:17:45 2013
@@ -101,7 +101,7 @@ public class GTOrEqualToExpr extends Bin
// null
if(left.result == null || right.result == null) {
left.result = null;
- left.returnStatus = POStatus.STATUS_NULL;
+ left.returnStatus = POStatus.STATUS_OK;
return left;
}
assert(left.result instanceof Comparable);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Sat Nov 16 00:17:45 2013
@@ -97,7 +97,7 @@ public class GreaterThanExpr extends Bin
// null
if(left.result == null || right.result == null) {
left.result = null;
- left.returnStatus = POStatus.STATUS_NULL;
+ left.returnStatus = POStatus.STATUS_OK;
return left;
}
assert(left.result instanceof Comparable);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Sat Nov 16 00:17:45 2013
@@ -100,7 +100,7 @@ public class LTOrEqualToExpr extends Bin
// null
if(left.result == null || right.result == null) {
left.result = null;
- left.returnStatus = POStatus.STATUS_NULL;
+ left.returnStatus = POStatus.STATUS_OK;
return left;
}
assert(left.result instanceof Comparable);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Sat Nov 16 00:17:45 2013
@@ -101,7 +101,7 @@ public class LessThanExpr extends Binary
// null
if(left.result == null || right.result == null) {
left.result = null;
- left.returnStatus = POStatus.STATUS_NULL;
+ left.returnStatus = POStatus.STATUS_OK;
return left;
}
assert(left.result instanceof Comparable);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Sat Nov 16 00:17:45 2013
@@ -105,7 +105,7 @@ public class NotEqualToExpr extends Bina
// null
if(left.result == null || right.result == null) {
left.result = null;
- left.returnStatus = POStatus.STATUS_NULL;
+ left.returnStatus = POStatus.STATUS_OK;
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java Sat Nov 16 00:17:45 2013
@@ -22,8 +22,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.DataType;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -65,8 +65,8 @@ public class POAnd extends BinaryCompari
Result left;
left = lhs.getNextBoolean();
- // pass on ERROR and EOP
- if(left.returnStatus != POStatus.STATUS_OK && left.returnStatus != POStatus.STATUS_NULL) {
+ // pass on ERROR and EOP and NULL
+ if (left.returnStatus != POStatus.STATUS_OK) {
return left;
}
@@ -93,7 +93,7 @@ public class POAnd extends BinaryCompari
}
// pass on ERROR and EOP
- if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
+ if (right.returnStatus != POStatus.STATUS_OK) {
return right;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java Sat Nov 16 00:17:45 2013
@@ -22,8 +22,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.DataType;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -65,8 +65,8 @@ public class POOr extends BinaryComparis
Result left;
left = lhs.getNextBoolean();
- // pass on ERROR and EOP
- if(left.returnStatus != POStatus.STATUS_OK && left.returnStatus != POStatus.STATUS_NULL) {
+ // pass on ERROR, EOP, and NULL
+ if (left.returnStatus != POStatus.STATUS_OK) {
return left;
}
@@ -91,8 +91,8 @@ public class POOr extends BinaryComparis
if (returnLeft)
return left;
- // pass on ERROR and EOP
- if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
+ // pass on ERROR, EOP, and NULL
+ if (right.returnStatus != POStatus.STATUS_OK) {
return right;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Sat Nov 16 00:17:45 2013
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
@@ -310,7 +309,7 @@ public class POProject extends Expressio
} else if (input.result==null) {
Result retVal = new Result();
retVal.result = null;
- retVal.returnStatus = POStatus.STATUS_NULL;
+ retVal.returnStatus = POStatus.STATUS_OK;
return retVal;
} else {
throw new ExecException("Cannot dereference a bag from " + input.result.getClass().getName(), 1129);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Sat Nov 16 00:17:45 2013
@@ -25,7 +25,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.DataType;
import org.apache.pig.data.NonSpillableDataBag;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -112,7 +111,9 @@ public class PORelationToExprProject ext
}
if(input.returnStatus!=POStatus.STATUS_OK) {
- if(input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP) {
+ if(input.returnStatus == POStatus.STATUS_NULL){
+ return input;
+ } else if (input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP) {
// we received an EOP from the predecessor
// since the successor in the pipeline is
// expecting a bag, send an empty bag
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Sat Nov 16 00:17:45 2013
@@ -27,12 +27,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.pig.data.Tuple;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryComparisonOperator;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
@@ -52,9 +52,9 @@ public class PhysicalPlan extends Operat
private static final long serialVersionUID = 1L;
// marker to indicate whether all input for this plan
- // has been sent - this is currently only used in POStream
+ // has been sent - this is currently used in POStream
// to know if all map() calls and reduce() calls are finished
- // and that there is no more input expected.
+ // and that there is no more input expected and in POPartialAgg.
public boolean endOfAllInput = false;
private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Sat Nov 16 00:17:45 2013
@@ -19,32 +19,24 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
-import org.apache.pig.data.DistinctDataBag;
import org.apache.pig.data.InternalDistinctBag;
-import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.impl.util.IdentityHashSet;
/**
* Find the distinct set of tuples in a bag.
@@ -95,37 +87,34 @@ public class PODistinct extends Physical
@Override
public Result getNextTuple() throws ExecException {
if (!inputsAccumulated) {
- Result in = processInput();
-
// by default, we create InternalSortedBag, unless user configures
- // explicitly to use old bag
- String bagType = null;
+ // explicitly to use old bag
+ String bagType = null;
if (PigMapReduce.sJobConfInternal.get() != null) {
- bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
- }
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- distinctBag = BagFactory.getInstance().newDistinctBag();
- } else {
- distinctBag = new InternalDistinctBag(3);
- }
-
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
+ }
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ distinctBag = BagFactory.getInstance().newDistinctBag();
+ } else {
+ distinctBag = new InternalDistinctBag(3);
+ }
+
+ Result in = processInput();
while (in.returnStatus != POStatus.STATUS_EOP) {
if (in.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from inputs");
return in;
- //continue;
} else if (in.returnStatus == POStatus.STATUS_NULL) {
- // Ignore the null, read the next tuple. It's not clear
- // to me that we should ever get this, or if we should,
- // how it differs from EOP. But ignoring it here seems
- // to work.
+ // Ignore and read the next tuple.
in = processInput();
continue;
+ } else {
+ distinctBag.add((Tuple) in.result);
+ illustratorMarkup(in.result, in.result, 0);
+ in = processInput();
}
- distinctBag.add((Tuple) in.result);
- illustratorMarkup(in.result, in.result, 0);
- in = processInput();
}
+
inputsAccumulated = true;
}
if (it == null) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Sat Nov 16 00:17:45 2013
@@ -18,21 +18,17 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.List;
-import java.util.LinkedList;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
/**
* This is an implementation of the Filter operator. It has an Expression Plan
@@ -147,8 +143,7 @@ public class POFilter extends PhysicalOp
*/
res = comOp.getNextBoolean();
plan.detachInput();
- if (res.returnStatus != POStatus.STATUS_OK
- && res.returnStatus != POStatus.STATUS_NULL)
+ if (res.returnStatus != POStatus.STATUS_OK)
return res;
if (res.result != null) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Sat Nov 16 00:17:45 2013
@@ -308,9 +308,8 @@ public class POLocalRearrange extends Ph
break;
}
- // allow null as group by key
- if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
- return new Result();
+ if (res.returnStatus != POStatus.STATUS_OK) {
+ return res;
}
resLst.add(res);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sat Nov 16 00:17:45 2013
@@ -60,7 +60,6 @@ public class POPartialAgg extends Physic
private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
private static final long serialVersionUID = 1L;
- private static final Result ERR_RESULT = new Result();
private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
@@ -155,10 +154,11 @@ public class POPartialAgg extends Physic
if (doSpill) {
startSpill();
Result result = spillResult();
- if (result == EOP_RESULT) {
+ if (result.returnStatus == POStatus.STATUS_EOP) {
doSpill = false;
}
- if (result != EOP_RESULT || inputsExhausted) {
+ if (result.returnStatus != POStatus.STATUS_EOP
+ || inputsExhausted) {
return result;
}
}
@@ -189,8 +189,8 @@ public class POPartialAgg extends Physic
// evaluate the key
Result keyRes = getResult(keyLeaf);
- if (keyRes == ERR_RESULT) {
- return ERR_RESULT;
+ if (keyRes.returnStatus != POStatus.STATUS_OK) {
+ return keyRes;
}
Object key = keyRes.result;
keyPlan.detachInput();
@@ -448,7 +448,7 @@ public class POPartialAgg extends Physic
private Result getResult(ExpressionOperator op) throws ExecException {
- Result res = ERR_RESULT;
+ Result res;
switch (op.getResultType()) {
case DataType.BAG:
case DataType.BOOLEAN:
@@ -471,12 +471,7 @@ public class POPartialAgg extends Physic
throw new ExecException(msg, 2270, PigException.BUG);
}
- // allow null as group by key
- if (res.returnStatus == POStatus.STATUS_OK
- || res.returnStatus == POStatus.STATUS_NULL) {
- return res;
- }
- return ERR_RESULT;
+ return res;
}
/**
@@ -493,8 +488,8 @@ public class POPartialAgg extends Physic
for (int i = 0; i < valuePlans.size(); i++) {
valuePlans.get(i).attachInput(value);
Result valRes = getResult(valueLeaves.get(i));
- if (valRes == ERR_RESULT) {
- return ERR_RESULT;
+ if (valRes.returnStatus == POStatus.STATUS_ERR) {
+ return valRes;
}
output.set(i + 1, valRes.result);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Sat Nov 16 00:17:45 2013
@@ -159,10 +159,8 @@ public class POPreCombinerLocalRearrange
break;
}
- // allow null as group by key
- if (res.returnStatus != POStatus.STATUS_OK
- && res.returnStatus != POStatus.STATUS_NULL) {
- return new Result();
+ if (res.returnStatus != POStatus.STATUS_OK) {
+ return res;
}
resLst.add(res);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Sat Nov 16 00:17:45 2013
@@ -268,20 +268,18 @@ public class POSort extends PhysicalOper
sortedBag = new InternalSortedBag(3, mComparator);
}
- while (res.returnStatus != POStatus.STATUS_EOP) {
+ while (res.returnStatus != POStatus.STATUS_EOP) {
if (res.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from the inputs");
return res;
- //continue;
- } else if (res.returnStatus == POStatus.STATUS_NULL) {
- // ignore the null, read the next tuple.
+ } else if (res.returnStatus == POStatus.STATUS_NULL) {
+ // Ignore and read the next tuple.
res = processInput();
- continue;
- }
+ continue;
+ }
sortedBag.add((Tuple) res.result);
res = processInput();
-
- }
+ }
inputsAccumulated = true;
Modified: pig/trunk/test/org/apache/pig/test/TestEqualTo.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEqualTo.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEqualTo.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEqualTo.java Sat Nov 16 00:17:45 2013
@@ -406,7 +406,7 @@ public class TestEqualTo {
g.setRhs(rt);
Result r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in rhs
@@ -417,7 +417,7 @@ public class TestEqualTo {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in lhs and rhs
@@ -428,7 +428,7 @@ public class TestEqualTo {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestGTOrEqual.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGTOrEqual.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGTOrEqual.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGTOrEqual.java Sat Nov 16 00:17:45 2013
@@ -400,7 +400,7 @@ public class TestGTOrEqual {
g.setRhs(rt);
Result r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in rhs
@@ -411,7 +411,7 @@ public class TestGTOrEqual {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in lhs and rhs
@@ -422,7 +422,7 @@ public class TestGTOrEqual {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGreaterThan.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGreaterThan.java Sat Nov 16 00:17:45 2013
@@ -397,7 +397,7 @@ public class TestGreaterThan {
g.setRhs(rt);
Result r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in rhs
@@ -408,7 +408,7 @@ public class TestGreaterThan {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in lhs and rhs
@@ -419,7 +419,7 @@ public class TestGreaterThan {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
}
Modified: pig/trunk/test/org/apache/pig/test/TestLTOrEqual.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLTOrEqual.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLTOrEqual.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLTOrEqual.java Sat Nov 16 00:17:45 2013
@@ -394,7 +394,7 @@ public class TestLTOrEqual {
g.setRhs(rt);
Result r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull((Boolean)r.result);
// test with null in rhs
@@ -405,7 +405,7 @@ public class TestLTOrEqual {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull((Boolean)r.result);
// test with null in lhs and rhs
@@ -416,7 +416,7 @@ public class TestLTOrEqual {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull((Boolean)r.result);
}
Modified: pig/trunk/test/org/apache/pig/test/TestLessThan.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLessThan.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLessThan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLessThan.java Sat Nov 16 00:17:45 2013
@@ -400,7 +400,7 @@ public class TestLessThan {
g.setRhs(rt);
Result r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in rhs
@@ -411,7 +411,7 @@ public class TestLessThan {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in lhs and rhs
@@ -422,7 +422,7 @@ public class TestLessThan {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
}
}
\ No newline at end of file
Modified: pig/trunk/test/org/apache/pig/test/TestNotEqualTo.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNotEqualTo.java?rev=1542432&r1=1542431&r2=1542432&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNotEqualTo.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNotEqualTo.java Sat Nov 16 00:17:45 2013
@@ -414,7 +414,7 @@ public class TestNotEqualTo {
g.setRhs(rt);
Result r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in rhs
@@ -425,7 +425,7 @@ public class TestNotEqualTo {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
// test with null in lhs and rhs
@@ -436,7 +436,7 @@ public class TestNotEqualTo {
g.setRhs(rt);
r = g.getNextBoolean();
- assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+ assertEquals(POStatus.STATUS_OK, r.returnStatus);
assertNull(r.result);
}
}