You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/11/12 19:33:18 UTC
svn commit: r835487 [2/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengin...
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java Thu Nov 12 18:33:15 2009
@@ -56,6 +56,11 @@
@Override
public Result getNext(Boolean bool) throws ExecException {
+ Result r = accumChild(null, dummyString);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result left, right;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Thu Nov 12 18:33:15 2009
@@ -105,6 +105,13 @@
@Override
public Result getNext(DataBag db) throws ExecException {
Result input = processInputBag();
+
+ // if this is called during accumulation, it is ok to have an empty bag
+ // we need to send STATUS_OK so that the UDF can be called.
+ if (isAccumulative()) {
+ reset();
+ }
+
if(input.returnStatus!=POStatus.STATUS_OK) {
if(input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP) {
// we received an EOP from the predecessor
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java Thu Nov 12 18:33:15 2009
@@ -43,108 +43,108 @@
@SuppressWarnings("unchecked")
public class POUserComparisonFunc extends ExpressionOperator {
- /**
+ /**
*
*/
private static final long serialVersionUID = 1L;
FuncSpec funcSpec;
Tuple t1, t2;
transient ComparisonFunc func;
- transient private Log log = LogFactory.getLog(getClass());
-
- public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec, ComparisonFunc func) {
+ transient private Log log = LogFactory.getLog(getClass());
+
+ public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec, ComparisonFunc func) {
super(k, rp);
super.setInputs(inp);
this.funcSpec = funcSpec;
- this.func = func;
+ this.func = func;
if(func==null)
instantiateFunc();
- }
-
- public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec) {
- this(k, rp, inp, funcSpec, null);
- }
-
- private void instantiateFunc() {
- this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+ }
+
+ public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec) {
+ this(k, rp, inp, funcSpec, null);
+ }
+
+ private void instantiateFunc() {
+ this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
this.func.setReporter(reporter);
- }
-
- public ComparisonFunc getComparator() {
- return func;
- }
-
- @Override
- public Result getNext(Integer i) throws ExecException {
- Result result = new Result();
-
- result.result = func.compare(t1, t2);
- result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
- : POStatus.STATUS_ERR;
- // the two attached tuples are used up now. So we set the
- // inputAttached flag to false
- inputAttached = false;
- return result;
-
- }
-
- private Result getNext() {
- Result res = null;
- log.error("getNext being called with non-integer");
- return res;
- }
-
- @Override
- public Result getNext(Boolean b) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(DataBag db) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(DataByteArray ba) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Double d) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Float f) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Long l) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Map m) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(String s) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Tuple in) throws ExecException {
- return getNext();
- }
-
- public void attachInput(Tuple t1, Tuple t2) {
- this.t1 = t1;
- this.t2 = t2;
- inputAttached = true;
+ }
+
+ public ComparisonFunc getComparator() {
+ return func;
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ Result result = new Result();
+
+ result.result = func.compare(t1, t2);
+ result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
+ : POStatus.STATUS_ERR;
+ // the two attached tuples are used up now. So we set the
+ // inputAttached flag to false
+ inputAttached = false;
+ return result;
+
+ }
+
+ private Result getNext() {
+ Result res = null;
+ log.error("getNext being called with non-integer");
+ return res;
+ }
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+ return getNext();
+ }
- }
+ @Override
+ public Result getNext(Double d) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Tuple in) throws ExecException {
+ return getNext();
+ }
+
+ public void attachInput(Tuple t1, Tuple t2) {
+ this.t1 = t1;
+ this.t2 = t2;
+ inputAttached = true;
+
+ }
private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
is.defaultReadObject();
@@ -184,4 +184,12 @@
return clone;
}
+ /**
+ * Get child expressions of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Nov 12 18:33:15 2009
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -51,35 +52,35 @@
public class POUserFunc extends ExpressionOperator {
- /**
+ /**
*
*/
private static final long serialVersionUID = 1L;
transient EvalFunc func;
-
- transient private final Log log = LogFactory.getLog(getClass());
- FuncSpec funcSpec;
+
+ transient private final Log log = LogFactory.getLog(getClass());
+ FuncSpec funcSpec;
FuncSpec origFSpec;
- public static final byte INITIAL = 0;
- public static final byte INTERMEDIATE = 1;
- public static final byte FINAL = 2;
- private boolean initialized = false;
-
- public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
- super(k, rp);
- inputs = inp;
+ public static final byte INITIAL = 0;
+ public static final byte INTERMEDIATE = 1;
+ public static final byte FINAL = 2;
+ private boolean initialized = false;
- }
+ public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp);
+ inputs = inp;
- public POUserFunc(
+ }
+
+ public POUserFunc(
OperatorKey k,
int rp,
List<PhysicalOperator> inp,
FuncSpec funcSpec) {
- this(k, rp, inp, funcSpec, null);
- }
-
- public POUserFunc(
+ this(k, rp, inp, funcSpec, null);
+ }
+
+ public POUserFunc(
OperatorKey k,
int rp,
List<PhysicalOperator> inp,
@@ -87,60 +88,60 @@
EvalFunc func) {
super(k, rp);
super.setInputs(inp);
- this.funcSpec = funcSpec;
+ this.funcSpec = funcSpec;
this.origFSpec = funcSpec;
- this.func = func;
+ this.func = func;
instantiateFunc(funcSpec);
- }
+ }
- private void instantiateFunc(FuncSpec fSpec) {
- this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
- //the next couple of initializations do not work as intended for the following reasons
- //the reporter and pigLogger are member variables of PhysicalOperator
- //when instanitateFunc is invoked at deserialization time, both
- //reporter and pigLogger are null. They are set during map and reduce calls,
- //making the initializations here basically useless. Look at the processInput
- //method where these variables are re-initialized. At that point, the PhysicalOperator
- //is set up correctly with the reporter and pigLogger references
+ private void instantiateFunc(FuncSpec fSpec) {
+ this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+ //the next couple of initializations do not work as intended for the following reasons
+ //the reporter and pigLogger are member variables of PhysicalOperator
+ //when instanitateFunc is invoked at deserialization time, both
+ //reporter and pigLogger are null. They are set during map and reduce calls,
+ //making the initializations here basically useless. Look at the processInput
+ //method where these variables are re-initialized. At that point, the PhysicalOperator
+ //is set up correctly with the reporter and pigLogger references
this.func.setReporter(reporter);
this.func.setPigLogger(pigLogger);
- }
-
- public Result processInput() throws ExecException {
+ }
+
+ public Result processInput() throws ExecException {
// Make sure the reporter is set, because it isn't getting carried
// across in the serialization (don't know why). I suspect it's as
// cheap to call the setReporter call everytime as to check whether I
// have (hopefully java will inline it).
if(!initialized) {
- func.setReporter(reporter);
- func.setPigLogger(pigLogger);
- initialized = true;
+ func.setReporter(reporter);
+ func.setPigLogger(pigLogger);
+ initialized = true;
}
- Result res = new Result();
- Tuple inpValue = null;
- if (input == null && (inputs == null || inputs.size()==0)) {
+ Result res = new Result();
+ Tuple inpValue = null;
+ if (input == null && (inputs == null || inputs.size()==0)) {
// log.warn("No inputs found. Signaling End of Processing.");
- res.returnStatus = POStatus.STATUS_EOP;
- return res;
- }
-
- //Should be removed once the model is clear
- if(reporter!=null) reporter.progress();
-
-
- if(isInputAttached()) {
- res.result = input;
- res.returnStatus = POStatus.STATUS_OK;
- detachInput();
- return res;
- } else {
- res.result = TupleFactory.getInstance().newTuple();
-
- Result temp = null;
- for(PhysicalOperator op : inputs) {
- switch(op.getResultType()){
+ res.returnStatus = POStatus.STATUS_EOP;
+ return res;
+ }
+
+ //Should be removed once the model is clear
+ if(reporter!=null) reporter.progress();
+
+
+ if(isInputAttached()) {
+ res.result = input;
+ res.returnStatus = POStatus.STATUS_OK;
+ detachInput();
+ return res;
+ } else {
+ res.result = TupleFactory.getInstance().newTuple();
+
+ Result temp = null;
+ for(PhysicalOperator op : inputs) {
+ switch(op.getResultType()){
case DataType.BAG:
temp = op.getNext(dummyBag);
break;
@@ -187,210 +188,222 @@
}
}
((Tuple)res.result).append(temp.result);
- }
- res.returnStatus = temp.returnStatus;
- return res;
- }
- }
+ }
+ res.returnStatus = temp.returnStatus;
+ return res;
+ }
+ }
- private Result getNext() throws ExecException {
- Result result = processInput();
+ private Result getNext() throws ExecException {
+ Result result = processInput();
String errMsg = "";
- try {
- if(result.returnStatus == POStatus.STATUS_OK) {
- result.result = func.exec((Tuple) result.result);
+ try {
+ if(result.returnStatus == POStatus.STATUS_OK) {
+ if (isAccumulative()) {
+ if (isAccumStarted()) {
+ ((Accumulator)func).accumulate((Tuple)result.result);
+ result.returnStatus = POStatus.STATUS_BATCH_OK;
+ result.result = null;
+ }else{
+ result.result = ((Accumulator)func).getValue();
+ ((Accumulator)func).cleanup();
+ }
+ } else {
+ result.result = func.exec((Tuple) result.result);
+ }
if(resultType == DataType.BYTEARRAY) {
if(res.result != null && DataType.findType(result.result) != DataType.BYTEARRAY) {
result.result = new DataByteArray(result.result.toString().getBytes());
}
}
- return result;
- }
- return result;
- } catch (ExecException ee) {
- throw ee;
- } catch (IOException ioe) {
- int errCode = 2078;
- String msg = "Caught error from UDF: " + funcSpec.getClassName();
+ return result;
+ }
+
+ return result;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (IOException ioe) {
+ int errCode = 2078;
+ String msg = "Caught error from UDF: " + funcSpec.getClassName();
String footer = " [" + ioe.getMessage() + "]";
-
- if(ioe instanceof PigException) {
- int udfErrorCode = ((PigException)ioe).getErrorCode();
- if(udfErrorCode != 0) {
- errCode = udfErrorCode;
- msg = ((PigException)ioe).getMessage();
- } else {
- msg += " [" + ((PigException)ioe).getMessage() + " ]";
- }
- } else {
- msg += footer;
- }
-
- throw new ExecException(msg, errCode, PigException.BUG, ioe);
- } catch (IndexOutOfBoundsException ie) {
+
+ if(ioe instanceof PigException) {
+ int udfErrorCode = ((PigException)ioe).getErrorCode();
+ if(udfErrorCode != 0) {
+ errCode = udfErrorCode;
+ msg = ((PigException)ioe).getMessage();
+ } else {
+ msg += " [" + ((PigException)ioe).getMessage() + " ]";
+ }
+ } else {
+ msg += footer;
+ }
+
+ throw new ExecException(msg, errCode, PigException.BUG, ioe);
+ } catch (IndexOutOfBoundsException ie) {
int errCode = 2078;
String msg = "Caught error from UDF: " + funcSpec.getClassName() +
", Out of bounds access [" + ie.getMessage() + "]";
throw new ExecException(msg, errCode, PigException.BUG, ie);
- }
- }
+ }
+ }
- @Override
- public Result getNext(Tuple tIn) throws ExecException {
- return getNext();
- }
+ @Override
+ public Result getNext(Tuple tIn) throws ExecException {
+ return getNext();
+ }
- @Override
- public Result getNext(DataBag db) throws ExecException {
- return getNext();
- }
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
- @Override
- public Result getNext(Integer i) throws ExecException {
- return getNext();
- }
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ return getNext();
+ }
- @Override
- public Result getNext(Boolean b) throws ExecException {
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(DataByteArray ba) throws ExecException {
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Double d) throws ExecException {
+ @Override
+ public Result getNext(Double d) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Float f) throws ExecException {
+ @Override
+ public Result getNext(Float f) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Long l) throws ExecException {
+ @Override
+ public Result getNext(Long l) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Map m) throws ExecException {
+ @Override
+ public Result getNext(Map m) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(String s) throws ExecException {
+ @Override
+ public Result getNext(String s) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- public void setAlgebraicFunction(byte Function) throws ExecException {
- // This will only be used by the optimizer for putting correct functions
- // in the mapper,
- // combiner and reduce. This helps in maintaining the physical plan as
- // is without the
- // optimiser having to replace any operators.
- // You wouldn't be able to make two calls to this function on the same
- // algebraic EvalFunc as
- // func is being changed.
- switch (Function) {
- case INITIAL:
+ public void setAlgebraicFunction(byte Function) throws ExecException {
+ // This will only be used by the optimizer for putting correct functions
+ // in the mapper,
+ // combiner and reduce. This helps in maintaining the physical plan as
+ // is without the
+ // optimiser having to replace any operators.
+ // You wouldn't be able to make two calls to this function on the same
+ // algebraic EvalFunc as
+ // func is being changed.
+ switch (Function) {
+ case INITIAL:
funcSpec = new FuncSpec(getInitial());
- break;
- case INTERMEDIATE:
+ break;
+ case INTERMEDIATE:
funcSpec = new FuncSpec(getIntermed());
- break;
- case FINAL:
+ break;
+ case FINAL:
funcSpec = new FuncSpec(getFinal());
- break;
+ break;
- }
+ }
instantiateFunc(funcSpec);
setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
- }
+ }
- public String getInitial() throws ExecException {
- instantiateFunc(origFSpec);
- if (func instanceof Algebraic) {
- return ((Algebraic) func).getInitial();
- } else {
- int errCode = 2072;
- String msg = "Attempt to run a non-algebraic function"
+ public String getInitial() throws ExecException {
+ instantiateFunc(origFSpec);
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getInitial();
+ } else {
+ int errCode = 2072;
+ String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
+ }
+ }
- public String getIntermed() throws ExecException {
+ public String getIntermed() throws ExecException {
instantiateFunc(origFSpec);
- if (func instanceof Algebraic) {
- return ((Algebraic) func).getIntermed();
- } else {
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getIntermed();
+ } else {
int errCode = 2072;
String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
+ }
+ }
- public String getFinal() throws ExecException {
+ public String getFinal() throws ExecException {
instantiateFunc(origFSpec);
- if (func instanceof Algebraic) {
- return ((Algebraic) func).getFinal();
- } else {
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getFinal();
+ } else {
int errCode = 2072;
String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
+ }
+ }
- public Type getReturnType() {
- return func.getReturnType();
- }
+ public Type getReturnType() {
+ return func.getReturnType();
+ }
- public void finish() {
- func.finish();
- }
+ public void finish() {
+ func.finish();
+ }
- public Schema outputSchema(Schema input) {
- return func.outputSchema(input);
- }
+ public Schema outputSchema(Schema input) {
+ return func.outputSchema(input);
+ }
- public Boolean isAsynchronous() {
- return func.isAsynchronous();
- }
+ public Boolean isAsynchronous() {
+ return func.isAsynchronous();
+ }
- @Override
- public String name() {
- return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
- }
+ @Override
+ public String name() {
+ return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
+ }
- @Override
- public boolean supportsMultipleInputs() {
+ @Override
+ public boolean supportsMultipleInputs() {
- return true;
- }
+ return true;
+ }
- @Override
- public boolean supportsMultipleOutputs() {
+ @Override
+ public boolean supportsMultipleOutputs() {
- return false;
- }
+ return false;
+ }
- @Override
- public void visit(PhyPlanVisitor v) throws VisitorException {
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
- v.visitUserFunc(this);
- }
+ v.visitUserFunc(this);
+ }
public FuncSpec getFuncSpec() {
return funcSpec;
@@ -414,4 +427,12 @@
is.defaultReadObject();
instantiateFunc(funcSpec);
}
+
+ /**
+ * Get child expression of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
@Override
public Result getNext(Double d) throws ExecException {
+ Result r = accumChild(null, d);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Double left = null, right = null;
@@ -76,6 +81,11 @@
@Override
public Result getNext(Float f) throws ExecException {
+ Result r = accumChild(null, f);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Float left = null, right = null;
@@ -99,6 +109,11 @@
@Override
public Result getNext(Integer i) throws ExecException {
+ Result r = accumChild(null, i);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Integer left = null, right = null;
@@ -122,6 +137,11 @@
@Override
public Result getNext(Long l) throws ExecException {
+ Result r = accumChild(null, l);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Long left = null, right = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -26,6 +29,7 @@
public abstract class UnaryExpressionOperator extends ExpressionOperator {
ExpressionOperator expr;
+ private transient List<ExpressionOperator> child;
public UnaryExpressionOperator(OperatorKey k, int rp) {
super(k, rp);
@@ -73,4 +77,17 @@
resultType = op.getResultType();
}
+ /**
+ * Get child expression of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ if (child == null) {
+ child = new ArrayList<ExpressionOperator>();
+ child.add(expr);
+ }
+
+ return child;
+ }
+
}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is used during Reduce phrase to process tuples
+ * in batch mode. It is used by POPackage when all of the UDFs can be
+ * called in accumulative mode. Tuples are not pulled all at once,
+ * instead, each time, only a specified number of tuples are pulled out
+ * of iterator and put in an buffer. Then this buffer is wrapped into
+ * a bag to be passed to the operators in reduce plan.
+ *
+ * The purpose of doing this is to reduce memory usage and avoid spilling.
+ */
+public interface AccumulativeTupleBuffer {
+
+ /**
+ * Pull next batch of tuples from iterator and put them into this buffer
+ */
+ public void nextBatch() throws IOException;
+
+ /**
+ * Whether there are more tuples to pull out of iterator
+ */
+ public boolean hasNextBatch() ;
+
+ /**
+ * Clear internal buffer, this should be called after all data are retreived
+ */
+ public void clear();
+
+ /**
+ * Get iterator of tuples in the buffer
+ * @param index the index of tuple
+ */
+ public Iterator<Tuple> getTuples(int index);
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Nov 12 18:33:15 2009
@@ -29,6 +29,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
@@ -37,6 +38,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -89,6 +91,8 @@
protected PhysicalOperator[] planLeafOps = null;
+ protected transient AccumulativeTupleBuffer buffer;
+
public POForEach(OperatorKey k) {
this(k,-1,null,null);
}
@@ -146,15 +150,54 @@
@Override
public boolean supportsMultipleOutputs() {
return false;
+ }
+
+ public void setAccumulative() {
+ super.setAccumulative();
+ for(PhysicalPlan p : inputPlans) {
+ Iterator<PhysicalOperator> iter = p.iterator();
+ while(iter.hasNext()) {
+ PhysicalOperator po = iter.next();
+ if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+ po.setAccumulative();
+ }
+ }
+ }
+ }
+
+ public void setAccumStart() {
+ super.setAccumStart();
+ for(PhysicalPlan p : inputPlans) {
+ Iterator<PhysicalOperator> iter = p.iterator();
+ while(iter.hasNext()) {
+ PhysicalOperator po = iter.next();
+ if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+ po.setAccumStart();
+ }
+ }
+ }
}
+ public void setAccumEnd() {
+ super.setAccumEnd();
+ for(PhysicalPlan p : inputPlans) {
+ Iterator<PhysicalOperator> iter = p.iterator();
+ while(iter.hasNext()) {
+ PhysicalOperator po = iter.next();
+ if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+ po.setAccumEnd();
+ }
+ }
+ }
+ }
+
/**
* Calls getNext on the generate operator inside the nested
* physical plan and returns it maintaining an additional state
* to denote the begin and end of the nested plan processing.
*/
@Override
- public Result getNext(Tuple t) throws ExecException {
+ public Result getNext(Tuple t) throws ExecException {
Result res = null;
Result inp = null;
//The nested plan is under processing
@@ -162,14 +205,15 @@
//returns
if(processingPlan){
while(true) {
- res = processPlan();
+ res = processPlan();
+
if(res.returnStatus==POStatus.STATUS_OK) {
if(lineageTracer != null && res.result != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
+ ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+ tOut.synthetic = tIn.synthetic;
+ lineageTracer.insert(tOut);
+ lineageTracer.union(tOut, tIn);
+ res.result = tOut;
}
return res;
}
@@ -198,32 +242,71 @@
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
-
+
attachInputToPlans((Tuple) inp.result);
+ Tuple tuple = (Tuple)inp.result;
+
for (PhysicalOperator po : opsToBeReset) {
po.reset();
}
- res = processPlan();
+
+ if (isAccumulative()) {
+ for(int i=0; i<tuple.size(); i++) {
+ if (tuple.getType(i) == DataType.BAG) {
+ // we only need to check one bag, because all the bags
+ // share the same buffer
+ buffer = ((AccumulativeBag)tuple.get(i)).getTuplebuffer();
+ break;
+ }
+ }
+
+ while(true) {
+ if (buffer.hasNextBatch()) {
+ try {
+ buffer.nextBatch();
+ }catch(IOException e) {
+ throw new ExecException(e);
+ }
+
+ setAccumStart();
+ }else{
+ buffer.clear();
+ setAccumEnd();
+ }
+
+ res = processPlan();
+
+ if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
+ // attach same input again to process next batch
+ attachInputToPlans((Tuple) inp.result);
+ } else {
+ break;
+ }
+ }
+
+ } else {
+ res = processPlan();
+ }
processingPlan = true;
if(lineageTracer != null && res.result != null) {
- //we check for res.result since that can also be null in the case of flatten
- tIn = (ExampleTuple) inp.result;
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
+ //we check for res.result since that can also be null in the case of flatten
+ tIn = (ExampleTuple) inp.result;
+ ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+ tOut.synthetic = tIn.synthetic;
+ lineageTracer.insert(tOut);
+ lineageTracer.union(tOut, tIn);
+ res.result = tOut;
}
return res;
}
}
- protected Result processPlan() throws ExecException{
+ protected Result processPlan() throws ExecException{
Result res = new Result();
-
+
//We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
if(its != null) {
boolean restartIts = true;
@@ -238,6 +321,7 @@
}
}
+
if(its == null) {
//getNext being called for the first time OR starting with a set of new data from inputs
its = new Iterator[noItems];
@@ -287,9 +371,13 @@
}
}
-
+
+ if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {
+ continue;
+ }
+
if(inputData.returnStatus == POStatus.STATUS_EOP) {
- //we are done with all the elements. Time to return.
+ //we are done with all the elements. Time to return.
its = null;
bags = null;
return inputData;
@@ -310,6 +398,11 @@
}
}
+ // if accumulating, we haven't got data yet for some fields, just return
+ if (isAccumulative() && isAccumStarted()) {
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+ return res;
+ }
while(true) {
if(data == null) {
@@ -396,8 +489,8 @@
protected void attachInputToPlans(Tuple t) {
- //super.attachInput(t);
- for(PhysicalPlan p : inputPlans) {
+ //super.attachInput(t);
+ for(PhysicalPlan p : inputPlans) {
p.attachInput(t);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -25,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.DataBag;
@@ -199,7 +202,7 @@
public void setInner(boolean[] inner) {
this.inner = inner;
}
-
+
/**
* From the inputs, constructs the output tuple
* for this co-group in the required format which
@@ -218,38 +221,51 @@
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
- String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
- }
-
- for (int i = 0; i < numInputs; i++) {
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- dbs[i] = mBagFactory.newDefaultBag();
- } else {
- dbs[i] = new InternalCachedBag(numInputs);
- }
- }
-
- //For each indexed tup in the inp, sort them
- //into their corresponding bags based
- //on the index
- while (tupIter.hasNext()) {
- NullableTuple ntup = tupIter.next();
- int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
+ if (isAccumulative()) {
+ // create bag wrapper to pull tuples in many batches
+ // all bags have reference to the sample tuples buffer
+ // which contains tuples from one batch
+ POPackageTupleBuffer buffer = new POPackageTupleBuffer();
+ for (int i = 0; i < numInputs; i++) {
+ dbs[i] = new AccumulativeBag(buffer, i);
+ }
- if (numInputs == 1) {
+ } else {
+ // create bag to pull all tuples out of iterator
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ }
+
+
+ for (int i = 0; i < numInputs; i++) {
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ dbs[i] = mBagFactory.newDefaultBag();
+ } else {
+ dbs[i] = new InternalCachedBag(numInputs);
+ }
+ }
+
+ //For each indexed tup in the inp, sort them
+ //into their corresponding bags based
+ //on the index
+ while (tupIter.hasNext()) {
+ NullableTuple ntup = tupIter.next();
+ int index = ntup.getIndex();
+ Tuple copy = getValueTuple(ntup, index);
- // this is for multi-query merge where
- // the numInputs is always 1, but the index
- // (the position of the inner plan in the
- // enclosed operator) may not be 1.
- dbs[0].add(copy);
- } else {
- dbs[index].add(copy);
+ if (numInputs == 1) {
+
+ // this is for multi-query merge where
+ // the numInputs is always 1, but the index
+ // (the position of the inner plan in the
+ // enclosed operator) may not be 1.
+ dbs[0].add(copy);
+ } else {
+ dbs[index].add(copy);
+ }
+ if(reporter!=null) reporter.progress();
}
- if(reporter!=null) reporter.progress();
}
//Construct the output tuple by appending
@@ -259,14 +275,16 @@
res.set(0,key);
int i=-1;
for (DataBag bag : dbs) {
- if(inner[++i]){
+ i++;
+ if(inner[i] && !isAccumulative()){
if(bag.size()==0){
detachInput();
Result r = new Result();
r.returnStatus = POStatus.STATUS_NULL;
return r;
}
- }
+ }
+
res.set(i+1,bag);
}
}
@@ -420,4 +438,70 @@
this.useSecondaryKey = useSecondaryKey;
}
+ private class POPackageTupleBuffer implements AccumulativeTupleBuffer {
+ private List<Tuple>[] bags;
+ private Iterator<NullableTuple> iter;
+ private int batchSize;
+ private Object currKey;
+
+ @SuppressWarnings("unchecked")
+ public POPackageTupleBuffer() {
+ batchSize = 20000;
+ if (PigMapReduce.sJobConf != null) {
+ String size = PigMapReduce.sJobConf.get("pig.accumulative.batchsize");
+ if (size != null) {
+ batchSize = Integer.parseInt(size);
+ }
+ }
+
+ this.bags = new List[numInputs];
+ for(int i=0; i<numInputs; i++) {
+ this.bags[i] = new ArrayList<Tuple>();
+ }
+ this.iter = tupIter;
+ this.currKey = key;
+ }
+
+ @Override
+ public boolean hasNextBatch() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public void nextBatch() throws IOException {
+ for(int i=0; i<bags.length; i++) {
+ bags[i].clear();
+ }
+
+ key = currKey;
+ for(int i=0; i<batchSize; i++) {
+ if (iter.hasNext()) {
+ NullableTuple ntup = iter.next();
+ int index = ntup.getIndex();
+ Tuple copy = getValueTuple(ntup, index);
+ if (numInputs == 1) {
+
+ // this is for multi-query merge where
+ // the numInputs is always 1, but the index
+ // (the position of the inner plan in the
+ // enclosed operator) may not be 1.
+ bags[0].add(copy);
+ } else {
+ bags[index].add(copy);
+ }
+ }
+ }
+ }
+
+ public void clear() {
+ for(int i=0; i<bags.length; i++) {
+ bags[i].clear();
+ }
+ iter = null;
+ }
+
+ public Iterator<Tuple> getTuples(int index) {
+ return bags[index].iterator();
+ }
+ };
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java Thu Nov 12 18:33:15 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -41,7 +42,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global application
*/
-public class AVG extends EvalFunc<Double> implements Algebraic {
+public class AVG extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -178,6 +179,7 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
Double d = (Double)t.get(0);
+
// we count nulls in avg as contributing 0
// a departure from SQL for performance of
// COUNT() which implemented by just inspecting
@@ -261,5 +263,53 @@
funcList.add(new FuncSpec(IntAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
funcList.add(new FuncSpec(LongAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
return funcList;
+ }
+
+ /* Accumulator interface implementation */
+
+ private Double intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0.0;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.Map;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -36,7 +37,7 @@
* Generates the count of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global functions
*/
-public class COUNT extends EvalFunc<Long> implements Algebraic{
+public class COUNT extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@Override
@@ -136,5 +137,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface implementation */
+ private long intermediateCount = 0L;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ DataBag bag = (DataBag)b.get(0);
+ Iterator it = bag.iterator();
+ while (it.hasNext()){
+ Tuple t = (Tuple)it.next();
+ if (t != null && t.size() > 0 && t.get(0) != null) {
+ intermediateCount += 1;
+ }
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateCount = 0L;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateCount;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.Map;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -38,7 +39,7 @@
* implements SQL COUNT(*) semantics. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global functions
*/
-public class COUNT_STAR extends EvalFunc<Long> implements Algebraic{
+public class COUNT_STAR extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@Override
@@ -127,4 +128,31 @@
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+ /* Accumulator interface imlpemenatation */
+
+ private long intermediateCount = 0L;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ intermediateCount += sum(b);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateCount = 0L;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateCount;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global application
*/
-public class DoubleAvg extends EvalFunc<Double> implements Algebraic {
+public class DoubleAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -233,4 +234,52 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Double intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0.0;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class DoubleMax extends EvalFunc<Double> implements Algebraic {
+public class DoubleMax extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -154,5 +155,40 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Double intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Double.NEGATIVE_INFINITY;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMax;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the min of the Double values in the first field of a tuple.
*/
-public class DoubleMin extends EvalFunc<Double> implements Algebraic {
+public class DoubleMin extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface implementation */
+ private Double intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Double.POSITIVE_INFINITY;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
/**
* Generates the sum of the values of the first field of a tuple.
*/
-public class DoubleSum extends EvalFunc<Double> implements Algebraic {
+public class DoubleSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -160,4 +161,34 @@
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+ /* Accumulator interface implementation*/
+ private Double intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing sum in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateSum;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implementation, so if possible the execution will be split into a local and global application
*/
-public class FloatAvg extends EvalFunc<Double> implements Algebraic {
+public class FloatAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -230,5 +231,53 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Double intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0.0;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class FloatMax extends EvalFunc<Float> implements Algebraic {
+public class FloatMax extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
@Override
public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.FLOAT));
}
+
+ /* Accumulator interface */
+
+ private Float intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Float curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Float.NEGATIVE_INFINITY;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Float getValue() {
+ return intermediateMax;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the min of the Float values in the first field of a tuple.
*/
-public class FloatMin extends EvalFunc<Float> implements Algebraic {
+public class FloatMin extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
@Override
public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.FLOAT));
}
+
+ /* Accumulator interface implementation */
+ private Float intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Float curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Float.POSITIVE_INFINITY;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Float getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the sum of the Float values in the first field of a tuple.
*/
-public class FloatSum extends EvalFunc<Double> implements Algebraic {
+public class FloatSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double>{
@Override
public Double exec(Tuple input) throws IOException {
@@ -196,5 +197,35 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface implementation*/
+ private Double intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateSum;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implementation, so if possible the execution will be split into a local and global application
*/
-public class IntAvg extends EvalFunc<Double> implements Algebraic {
+public class IntAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -236,4 +237,51 @@
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+ /* Accumulator interface */
+
+ private Long intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0L;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class IntMax extends EvalFunc<Integer> implements Algebraic {
+public class IntMax extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
@Override
public Integer exec(Tuple input) throws IOException {
@@ -153,4 +154,39 @@
return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
}
+ /* Accumulator interface */
+
+ private Integer intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Integer curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Integer.MIN_VALUE;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Integer getValue() {
+ return intermediateMax;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the min of the Integer values in the first field of a tuple.
*/
-public class IntMin extends EvalFunc<Integer> implements Algebraic {
+public class IntMin extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
@Override
public Integer exec(Tuple input) throws IOException {
@@ -154,4 +155,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
}
+
+ /* Accumulator interface implementation */
+ private Integer intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Integer curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Integer.MAX_VALUE;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Integer getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
/**
* Generates the sum of the Integer in the first field of a tuple.
*/
-public class IntSum extends EvalFunc<Long> implements Algebraic {
+public class IntSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -198,4 +199,33 @@
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+ /* Accumulator interface implementation*/
+ private Long intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateSum;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implementation, so if possible the execution will be split into a local and global application
*/
-public class LongAvg extends EvalFunc<Double> implements Algebraic {
+public class LongAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -230,5 +231,53 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Long intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0L;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class LongMax extends EvalFunc<Long> implements Algebraic {
+public class LongMax extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface */
+
+ private Long intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Long.MIN_VALUE;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateMax;
+ }
}