You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/11/12 19:33:18 UTC
svn commit: r835487 [1/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengin...
Author: daijy
Date: Thu Nov 12 18:33:15 2009
New Revision: 835487
URL: http://svn.apache.org/viewvc?rev=835487&view=rev
Log:
PIG-979: Acummulator Interface for UDFs
Added:
hadoop/pig/trunk/src/org/apache/pig/Accumulator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java
hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Nov 12 18:33:15 2009
@@ -113,6 +113,8 @@
PIG-1038: Optimize nested distinct/sort to use secondary key (daijy)
+PIG-979: Acummulator Interface for UDFs (yinghe via daijy)
+
OPTIMIZATIONS
BUG FIXES
Added: hadoop/pig/trunk/src/org/apache/pig/Accumulator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Accumulator.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Accumulator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/Accumulator.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.data.Tuple;
+
+public interface Accumulator <T> {
+ /**
+ * Pass tuples to the UDF. You can retrive DataBag by calling b.get(index).
+ * Each DataBag may contain 0 to many tuples for current key
+ */
+ public void accumulate(Tuple b) throws IOException;
+
+ /**
+ * Called when all tuples from current key have been passed to accumulate.
+ * @return the value for the UDF for this key.
+ */
+ public T getValue();
+
+ /**
+ * Called after getValue() to prepare processing for next key.
+ */
+ public void cleanup();
+}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Accumulator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to optimize plans that determines if a reduce plan
+ * can run in accumulative mode.
+ */
+public class AccumulatorOptimizer extends MROpPlanVisitor {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ public AccumulatorOptimizer(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ // See if this is a map-reduce job
+ List<PhysicalOperator> pos = mr.reducePlan.getRoots();
+ if (pos == null || pos.size() == 0) {
+ return;
+ }
+
+ // See if this is a POPackage
+ PhysicalOperator po_package = pos.get(0);
+ if (!po_package.getClass().equals(POPackage.class)) {
+ return;
+ }
+
+ // if POPackage is for distinct, just return
+ if (((POPackage)po_package).isDistinct()) {
+ return;
+ }
+
+ // if any input to POPackage is inner, just return
+ boolean[] isInner = ((POPackage)po_package).getInner();
+ for(boolean b: isInner) {
+ if (b) {
+ return;
+ }
+ }
+
+ List<PhysicalOperator> l = mr.reducePlan.getSuccessors(po_package);
+ // there should be only one POForEach
+ if (l == null || l.size() == 0 || l.size() > 1) {
+ return;
+ }
+
+ PhysicalOperator po_foreach = l.get(0);
+ if (!(po_foreach instanceof POForEach)) {
+ return;
+ }
+
+ boolean foundUDF = false;
+ List<PhysicalPlan> list = ((POForEach)po_foreach).getInputPlans();
+ for(PhysicalPlan p: list) {
+ PhysicalOperator po = p.getLeaves().get(0);
+
+ // only expression operators are allowed
+ if (!(po instanceof ExpressionOperator)) {
+ return;
+ }
+
+ if (((ExpressionOperator)po).containUDF()) {
+ foundUDF = true;
+ }
+
+ if (!check(po)) {
+ return;
+ }
+ }
+
+ if (foundUDF) {
+ // if all tests are passed, reducer can run in accumulative mode
+ log.info("Reducer is to run in accumulative mode.");
+ po_package.setAccumulative();
+ po_foreach.setAccumulative();
+ }
+ }
+
+ /**
+ * Check if an operator is qualified to be under POForEach
+ * to turn on accumulator. The operator must be in the following list or
+ * an <code>POUserFunc</code>.
+ *
+ * If the operator has sub-operators, they must also belong to this list.
+ * <li>ConstantExpression</li>
+ * <li>POProject, whose result type is not BAG, or TUPLE and overloaded</li>
+ * <li>POMapLookup</li>
+ * <li>POCase</li>
+ * <li>UnaryExpressionOperator</li>
+ * <li>BinaryExpressionOperator</li>
+ * <li>POBinCond</li>
+ *
+ * If the operator is <code>POUserFunc</code>, it must implement
+ * <code>Accumulator</code> interface and its inputs pass the check
+ * by calling <code>checkUDFInput()</code>
+ *
+ * @param po the operator to be checked on
+ * @return <code>true</code> if it is ok, <code>false</code>
+ * if not.
+ */
+ @SuppressWarnings("unchecked")
+ private boolean check(PhysicalOperator po) {
+ if (po instanceof ConstantExpression) {
+ return true;
+ }
+
+ if (po instanceof POCast) {
+ return check(po.getInputs().get(0));
+ }
+
+ if (po instanceof POMapLookUp) {
+ return true;
+ }
+
+ if (po instanceof POProject) {
+ // POProject can not project data bag
+ if (((POProject)po).getResultType() == DataType.BAG) {
+ return false;
+ }
+
+ // POProject can not overload a data bag
+ if (((POProject)po).getResultType() == DataType.TUPLE && ((POProject)po).isOverloaded()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ if (po instanceof UnaryExpressionOperator) {
+ return check(((UnaryExpressionOperator)po).getExpr());
+ }
+
+ if (po instanceof BinaryExpressionOperator) {
+ return check(((BinaryExpressionOperator)po).getLhs()) &&
+ check(((BinaryExpressionOperator)po).getRhs());
+ }
+
+ if (po instanceof POBinCond) {
+ return check(((POBinCond)po).getLhs()) &&
+ check(((POBinCond)po).getRhs()) && check(((POBinCond)po).getCond());
+ }
+
+ if (po instanceof POUserFunc) {
+ String className = ((POUserFunc)po).getFuncSpec().getClassName();
+ Class c = null;
+ try {
+ c = PigContext.resolveClassName(className);
+ }catch(Exception e) {
+ return false;
+ }
+ if (!Accumulator.class.isAssignableFrom(c)) {
+ return false;
+ }
+
+ // check input of UDF
+ List<PhysicalOperator> inputs = po.getInputs();
+ for(PhysicalOperator p: inputs) {
+ if (!checkUDFInput(p)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Check operators under POUserFunc to verify if this
+ * is a valid UDF to run as accumulator. The inputs to
+ * <code>POUserFunc</code> must be in the following list.
+ * If the operator has sub-operators, they must also belong
+ * to this list.
+ *
+ * <li>PORelationToExprProject</li>
+ * <li>ConstantExpression</li>
+ * <li>POProject</li>
+ * <li>POCase</li>
+ * <li>UnaryExpressionOperator</li>
+ * <li>BinaryExpressionOperator</li>
+ * <li>POBinCond</li>
+ * <li>POSortedDistinct</li>
+ * <li>POForEach</li>
+ *
+ */
+ private boolean checkUDFInput(PhysicalOperator po) {
+ if (po instanceof PORelationToExprProject) {
+ return checkUDFInput(po.getInputs().get(0));
+ }
+
+ if (po instanceof POProject) {
+ return true;
+ }
+
+ if (po instanceof ConstantExpression) {
+ return true;
+ }
+
+ if (po instanceof UnaryExpressionOperator) {
+ return checkUDFInput(((UnaryExpressionOperator)po).getExpr());
+ }
+
+ if (po instanceof BinaryExpressionOperator) {
+ return checkUDFInput(((BinaryExpressionOperator)po).getLhs()) ||
+ checkUDFInput(((BinaryExpressionOperator)po).getRhs());
+ }
+
+ if (po instanceof POCast) {
+ return checkUDFInput(po.getInputs().get(0));
+ }
+
+ if (po instanceof POBinCond) {
+ return checkUDFInput(((POBinCond)po).getLhs()) ||
+ checkUDFInput(((POBinCond)po).getRhs()) || checkUDFInput(((POBinCond)po).getCond());
+ }
+
+ if (po instanceof POSortedDistinct) {
+ return true;
+ }
+
+ if (po instanceof POForEach) {
+ List<PhysicalPlan> list = ((POForEach)po).getInputPlans();
+ if (list.size() != 1) {
+ return false;
+ }
+
+ PhysicalOperator p = list.get(0).getLeaves().get(0);
+ if (checkUDFInput(p)) {
+ return checkUDFInput(po.getInputs().get(0));
+ }
+ }
+
+ return false;
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Nov 12 18:33:15 2009
@@ -181,18 +181,18 @@
//if the job controller fails before launching the jobs then there are
//no jobs to check for failure
if(jobControlException != null) {
- if(jobControlException instanceof PigException) {
- if(jobControlExceptionStackTrace != null) {
- LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace,
- pc.getProperties().getProperty("pig.logfile"),
+ if(jobControlException instanceof PigException) {
+ if(jobControlExceptionStackTrace != null) {
+ LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace,
+ pc.getProperties().getProperty("pig.logfile"),
log);
- }
+ }
throw jobControlException;
- } else {
+ } else {
int errCode = 2117;
String msg = "Unexpected error when launching map reduce job.";
throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
- }
+ }
}
if (!jc.getFailedJobs().isEmpty() )
@@ -424,6 +424,9 @@
EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
checker.visit();
+ AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+ accum.visit();
+
return plan;
}
@@ -434,29 +437,29 @@
* explicitly or if the default handler is null
*/
class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
-
- public void uncaughtException(Thread thread, Throwable throwable) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
- throwable.printStackTrace(ps);
- jobControlExceptionStackTrace = baos.toString();
- try {
- jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
- } catch (Exception e) {
- String errMsg = "Could not resolve error that occured when launching map reduce job: "
+
+ public void uncaughtException(Thread thread, Throwable throwable) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ throwable.printStackTrace(ps);
+ jobControlExceptionStackTrace = baos.toString();
+ try {
+ jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
+ } catch (Exception e) {
+ String errMsg = "Could not resolve error that occured when launching map reduce job: "
+ getFirstLineFromMessage(jobControlExceptionStackTrace);
- jobControlException = new RuntimeException(errMsg);
- }
- }
+ jobControlException = new RuntimeException(errMsg);
+ }
+ }
}
void computeWarningAggregate(Job job, JobClient jobClient, Map<Enum, Long> aggMap) {
- JobID mapRedJobID = job.getAssignedJobID();
- RunningJob runningJob = null;
- try {
- runningJob = jobClient.getJob(mapRedJobID);
- if(runningJob != null) {
- Counters counters = runningJob.getCounters();
+ JobID mapRedJobID = job.getAssignedJobID();
+ RunningJob runningJob = null;
+ try {
+ runningJob = jobClient.getJob(mapRedJobID);
+ if(runningJob != null) {
+ Counters counters = runningJob.getCounters();
if (counters==null)
{
long nullCounterCount = aggMap.get(PigWarning.NULL_COUNTER_COUNT)==null?0 : aggMap.get(PigWarning.NULL_COUNTER_COUNT);
@@ -479,11 +482,11 @@
aggMap.put(e, currentCount);
}
}
- }
- } catch (IOException ioe) {
- String msg = "Unable to retrieve job to compute warning aggregation.";
- log.warn(msg);
- }
+ }
+ } catch (IOException ioe) {
+ String msg = "Unable to retrieve job to compute warning aggregation.";
+ log.warn(msg);
+ }
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java Thu Nov 12 18:33:15 2009
@@ -30,5 +30,9 @@
// between ExecutableManager and POStream
public static final byte STATUS_EOS = 4; // end of Streaming output (i.e. output from streaming binary)
+ // successfully processing of a batch, used by accumulative UDFs
+ // this is used for accumulative UDFs
+ public static final byte STATUS_BATCH_OK = 5;
+
public static Object result;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
@Override
public Result getNext(Double d) throws ExecException {
+ Result r = accumChild(null, d);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Double left = null, right = null;
@@ -76,6 +81,11 @@
@Override
public Result getNext(Float f) throws ExecException {
+ Result r = accumChild(null, f);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Float left = null, right = null;
@@ -98,7 +108,12 @@
}
@Override
- public Result getNext(Integer i) throws ExecException {
+ public Result getNext(Integer i) throws ExecException {
+ Result r = accumChild(null, i);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Integer left = null, right = null;
@@ -122,6 +137,11 @@
@Override
public Result getNext(Long l) throws ExecException {
+ Result r = accumChild(null, l);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Long left = null, right = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.pig.impl.plan.OperatorKey;
/**
@@ -31,6 +34,7 @@
protected ExpressionOperator lhs;
protected ExpressionOperator rhs;
+ private transient List<ExpressionOperator> child;
public BinaryExpressionOperator(OperatorKey k) {
this(k,-1);
@@ -44,6 +48,18 @@
return lhs;
}
+ /**
+ * Get the child expressions of this expression
+ */
+ public List<ExpressionOperator> getChildExpressions() {
+ if (child == null) {
+ child = new ArrayList<ExpressionOperator>();
+ child.add(lhs);
+ child.add(rhs);
+ }
+ return child;
+ }
+
@Override
public boolean supportsMultipleInputs() {
return true;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import java.util.List;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
@@ -195,4 +196,12 @@
clone.cloneHelper(this);
return clone;
}
+
+ /**
+ * Get the child expressions of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Thu Nov 12 18:33:15 2009
@@ -54,6 +54,11 @@
@Override
public Result getNext(Double d) throws ExecException {
+ Result r = accumChild(null, d);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Double left = null, right = null;
@@ -72,9 +77,9 @@
right = (Double) res.result;
if (right == 0) {
- if(pigLogger != null) {
- pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
- }
+ if(pigLogger != null) {
+ pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+ }
res.result = null;
}
else
@@ -84,6 +89,11 @@
@Override
public Result getNext(Float f) throws ExecException {
+ Result r = accumChild(null, f);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Float left = null, right = null;
@@ -102,9 +112,9 @@
right = (Float) res.result;
if (right == 0) {
- if(pigLogger != null) {
- pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
- }
+ if(pigLogger != null) {
+ pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+ }
res.result = null;
}
else
@@ -114,6 +124,11 @@
@Override
public Result getNext(Integer i) throws ExecException {
+ Result r = accumChild(null, i);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Integer left = null, right = null;
@@ -132,10 +147,10 @@
right = (Integer) res.result;
if (right == 0) {
- if(pigLogger != null) {
- pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
- }
- res.result = null;
+ if(pigLogger != null) {
+ pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+ }
+ res.result = null;
}
else
res.result = Integer.valueOf(left / right);
@@ -144,6 +159,11 @@
@Override
public Result getNext(Long l) throws ExecException {
+ Result r = accumChild(null, l);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Long left = null, right = null;
@@ -162,9 +182,9 @@
right = (Long) res.result;
if (right == 0) {
- if(pigLogger != null) {
- pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
- }
+ if(pigLogger != null) {
+ pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+ }
res.result = null;
}
else
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
switch (operandType) {
case DataType.BYTEARRAY: {
+ Result r = accumChild(null, dummyDBA);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDBA);
right = rhs.getNext(dummyDBA);
return doComparison(left, right);
}
case DataType.DOUBLE: {
+ Result r = accumChild(null, dummyDouble);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDouble);
right = rhs.getNext(dummyDouble);
return doComparison(left, right);
}
case DataType.FLOAT: {
+ Result r = accumChild(null, dummyFloat);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyFloat);
right = rhs.getNext(dummyFloat);
return doComparison(left, right);
}
case DataType.INTEGER: {
+ Result r = accumChild(null, dummyInt);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyInt);
right = rhs.getNext(dummyInt);
return doComparison(left, right);
}
case DataType.LONG: {
+ Result r = accumChild(null, dummyLong);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyLong);
right = rhs.getNext(dummyLong);
return doComparison(left, right);
}
case DataType.CHARARRAY: {
+ Result r = accumChild(null, dummyString);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyString);
right = rhs.getNext(dummyString);
return doComparison(left, right);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Thu Nov 12 18:33:15 2009
@@ -18,15 +18,20 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.impl.plan.VisitorException;
@@ -60,6 +65,7 @@
}
public abstract void visit(PhyPlanVisitor v) throws VisitorException;
+
/**
* Make a deep copy of this operator. This is declared here to make it
@@ -73,5 +79,319 @@
throw new CloneNotSupportedException(s);
}
+ /**
+ * Get the sub-expressions of this expression.
+ * This is called if reducer is run as accumulative mode, all the child
+ * expression must be called if they have any UDF to drive the UDF.accumulate()
+ */
+ protected abstract List<ExpressionOperator> getChildExpressions();
+
+ /** check whether this expression contains any UDF
+ * this is called if reducer is run as accumulative mode
+ * in this case, all UDFs must be called
+ */
+ public boolean containUDF() {
+ if (this instanceof POUserFunc) {
+ return true;
+ }
+
+ List<ExpressionOperator> l = getChildExpressions();
+ if (l != null) {
+ for(ExpressionOperator e: l) {
+ return e.containUDF();
+ }
+ }
+
+ return false;
+ }
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, Double d) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(d);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, Integer v) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(v);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, Long l) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(l);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, Float f) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(f);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, Boolean b) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(b);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, String s) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(s);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, DataByteArray dba) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(dba);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, Map map) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(map);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, Tuple t) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(t);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ * Drive all the UDFs in accumulative mode
+ */
+ protected Result accumChild(List<ExpressionOperator> child, DataBag db) throws ExecException {
+ if (isAccumStarted()) {
+ if (child == null) {
+ child = getChildExpressions();
+ }
+ Result res = null;
+ if (child != null) {
+ for(ExpressionOperator e: child) {
+ if (e.containUDF()) {
+ res = e.getNext(db);
+ if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return res;
+ }
+ }
+ }
+ }
+
+ res = new Result();
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+
+ return res;
+ }
+
+ return null;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
switch (operandType) {
case DataType.BYTEARRAY: {
+ Result r = accumChild(null, dummyDBA);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDBA);
right = rhs.getNext(dummyDBA);
return doComparison(left, right);
}
case DataType.DOUBLE: {
+ Result r = accumChild(null, dummyDouble);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDouble);
right = rhs.getNext(dummyDouble);
return doComparison(left, right);
}
case DataType.FLOAT: {
+ Result r = accumChild(null, dummyFloat);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyFloat);
right = rhs.getNext(dummyFloat);
return doComparison(left, right);
}
case DataType.INTEGER: {
+ Result r = accumChild(null, dummyInt);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyInt);
right = rhs.getNext(dummyInt);
return doComparison(left, right);
}
case DataType.LONG: {
+ Result r = accumChild(null, dummyLong);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyLong);
right = rhs.getNext(dummyLong);
return doComparison(left, right);
}
case DataType.CHARARRAY: {
+ Result r = accumChild(null, dummyString);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyString);
right = rhs.getNext(dummyString);
return doComparison(left, right);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
switch (operandType) {
case DataType.BYTEARRAY: {
+ Result r = accumChild(null, dummyDBA);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDBA);
right = rhs.getNext(dummyDBA);
return doComparison(left, right);
}
case DataType.DOUBLE: {
+ Result r = accumChild(null, dummyDouble);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDouble);
right = rhs.getNext(dummyDouble);
return doComparison(left, right);
}
case DataType.FLOAT: {
+ Result r = accumChild(null, dummyFloat);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyFloat);
right = rhs.getNext(dummyFloat);
return doComparison(left, right);
}
case DataType.INTEGER: {
+ Result r = accumChild(null, dummyInt);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyInt);
right = rhs.getNext(dummyInt);
return doComparison(left, right);
}
case DataType.LONG: {
+ Result r = accumChild(null, dummyLong);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyLong);
right = rhs.getNext(dummyLong);
return doComparison(left, right);
}
case DataType.CHARARRAY: {
+ Result r = accumChild(null, dummyString);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyString);
right = rhs.getNext(dummyString);
return doComparison(left, right);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
switch (operandType) {
case DataType.BYTEARRAY: {
+ Result r = accumChild(null, dummyDBA);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDBA);
right = rhs.getNext(dummyDBA);
return doComparison(left, right);
}
case DataType.DOUBLE: {
+ Result r = accumChild(null, dummyDouble);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDouble);
right = rhs.getNext(dummyDouble);
return doComparison(left, right);
}
case DataType.FLOAT: {
+ Result r = accumChild(null, dummyFloat);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyFloat);
right = rhs.getNext(dummyFloat);
return doComparison(left, right);
}
case DataType.INTEGER: {
+ Result r = accumChild(null, dummyInt);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyInt);
right = rhs.getNext(dummyInt);
return doComparison(left, right);
}
case DataType.LONG: {
+ Result r = accumChild(null, dummyLong);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyLong);
right = rhs.getNext(dummyLong);
return doComparison(left, right);
}
case DataType.CHARARRAY: {
+ Result r = accumChild(null, dummyString);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyString);
right = rhs.getNext(dummyString);
return doComparison(left, right);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
switch (operandType) {
case DataType.BYTEARRAY: {
+ Result r = accumChild(null, dummyDBA);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDBA);
right = rhs.getNext(dummyDBA);
return doComparison(left, right);
}
case DataType.DOUBLE: {
+ Result r = accumChild(null, dummyDouble);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDouble);
right = rhs.getNext(dummyDouble);
return doComparison(left, right);
}
case DataType.FLOAT: {
+ Result r = accumChild(null, dummyFloat);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyFloat);
right = rhs.getNext(dummyFloat);
return doComparison(left, right);
}
case DataType.INTEGER: {
+ Result r = accumChild(null, dummyInt);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyInt);
right = rhs.getNext(dummyInt);
return doComparison(left, right);
}
case DataType.LONG: {
+ Result r = accumChild(null, dummyLong);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyLong);
right = rhs.getNext(dummyLong);
return doComparison(left, right);
}
case DataType.CHARARRAY: {
+ Result r = accumChild(null, dummyString);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyString);
right = rhs.getNext(dummyString);
return doComparison(left, right);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
@Override
public Result getNext(Integer i) throws ExecException{
+ Result r = accumChild(null, i);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Integer left = null, right = null;
@@ -76,6 +81,11 @@
@Override
public Result getNext(Long i) throws ExecException{
+ Result r = accumChild(null, i);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Long left = null, right = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
@Override
public Result getNext(Double d) throws ExecException {
+ Result r = accumChild(null, d);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Double left = null, right = null;
@@ -76,6 +81,11 @@
@Override
public Result getNext(Float f) throws ExecException {
+ Result r = accumChild(null, f);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Float left = null, right = null;
@@ -99,6 +109,11 @@
@Override
public Result getNext(Integer i) throws ExecException {
+ Result r = accumChild(null, i);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Integer left = null, right = null;
@@ -122,6 +137,11 @@
@Override
public Result getNext(Long l) throws ExecException {
+ Result r = accumChild(null, l);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Long left = null, right = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
switch (operandType) {
case DataType.BYTEARRAY: {
+ Result r = accumChild(null, dummyDBA);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDBA);
right = rhs.getNext(dummyDBA);
return doComparison(left, right);
}
case DataType.DOUBLE: {
+ Result r = accumChild(null, dummyDouble);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyDouble);
right = rhs.getNext(dummyDouble);
return doComparison(left, right);
}
case DataType.FLOAT: {
+ Result r = accumChild(null, dummyFloat);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyFloat);
right = rhs.getNext(dummyFloat);
return doComparison(left, right);
}
case DataType.INTEGER: {
+ Result r = accumChild(null, dummyInt);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyInt);
right = rhs.getNext(dummyInt);
return doComparison(left, right);
}
case DataType.LONG: {
+ Result r = accumChild(null, dummyLong);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyLong);
right = rhs.getNext(dummyLong);
return doComparison(left, right);
}
case DataType.CHARARRAY: {
+ Result r = accumChild(null, dummyString);
+ if (r != null) {
+ return r;
+ }
left = lhs.getNext(dummyString);
right = rhs.getNext(dummyString);
return doComparison(left, right);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java Thu Nov 12 18:33:15 2009
@@ -58,6 +58,11 @@
@Override
public Result getNext(Boolean b) throws ExecException {
+ Result r = accumChild(null, dummyBool);
+ if (r != null) {
+ return r;
+ }
+
Result left;
left = lhs.getNext(dummyBool);
// pass on ERROR and EOP
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +39,7 @@
ExpressionOperator cond;
ExpressionOperator lhs;
ExpressionOperator rhs;
+ private transient List<ExpressionOperator> child;
public POBinCond(OperatorKey k) {
super(k);
@@ -55,6 +58,11 @@
@Override
public Result getNext(Boolean b) throws ExecException {
+ Result r = accumChild(null, b);
+ if (r != null) {
+ return r;
+ }
+
Result res = cond.getNext(b);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
@@ -63,6 +71,21 @@
@Override
public Result getNext(DataBag db) throws ExecException {
+ List<ExpressionOperator> l = new ArrayList<ExpressionOperator>();
+ l.add(cond);
+ Result r = accumChild(l, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ l.clear();
+ l.add(lhs);
+ l.add(rhs);
+ r = accumChild(l, db);
+ return r;
+ }
+
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db);
@@ -70,6 +93,20 @@
@Override
public Result getNext(DataByteArray ba) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, ba);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba);
@@ -77,6 +114,20 @@
@Override
public Result getNext(Double d) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, d);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d);
@@ -84,6 +135,20 @@
@Override
public Result getNext(Float f) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, f);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f);
@@ -91,6 +156,19 @@
@Override
public Result getNext(Integer i) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, i);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i);
@@ -98,6 +176,20 @@
@Override
public Result getNext(Long l) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, l);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l);
@@ -105,6 +197,20 @@
@Override
public Result getNext(Map m) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, m);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m);
@@ -112,6 +218,20 @@
@Override
public Result getNext(String s) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, s);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s);
@@ -119,6 +239,20 @@
@Override
public Result getNext(Tuple t) throws ExecException {
+ List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+ list.add(cond);
+ Result r = accumChild(list, dummyBool);
+
+ if (r != null) {
+ if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+ return r;
+ }
+ list.clear();
+ list.add(lhs);
+ list.add(rhs);
+ r = accumChild(list, t);
+ return r;
+ }
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
return ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t);
@@ -153,6 +287,27 @@
this.lhs = lhs;
}
+ /**
+ * Get condition
+ */
+ public ExpressionOperator getCond() {
+ return this.cond;
+ }
+
+ /**
+ * Get right expression
+ */
+ public ExpressionOperator getRhs() {
+ return this.rhs;
+ }
+
+ /**
+ * Get left expression
+ */
+ public ExpressionOperator getLhs() {
+ return this.lhs;
+ }
+
@Override
public boolean supportsMultipleInputs() {
return true;
@@ -169,4 +324,18 @@
return clone;
}
+ /**
+ * Get child expressions of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ if (child == null) {
+ child = new ArrayList<ExpressionOperator>();
+ child.add(cond);
+ child.add(lhs);
+ child.add(rhs);
+ }
+ return child;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Thu Nov 12 18:33:15 2009
@@ -19,6 +19,8 @@
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -52,6 +54,7 @@
transient private Log log = LogFactory.getLog(getClass());
private boolean castNotNeeded = false;
private Byte realType = null;
+ private transient List<ExpressionOperator> child;
private static final long serialVersionUID = 1L;
@@ -328,7 +331,7 @@
String str = null;
Result res = in.getNext(str);
if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
- res.result = CastUtils.stringToLong((String)res.result);
+ res.result = CastUtils.stringToLong((String)res.result);
}
return res;
}
@@ -449,7 +452,7 @@
String str = null;
Result res = in.getNext(str);
if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
- res.result = CastUtils.stringToDouble((String)res.result);
+ res.result = CastUtils.stringToDouble((String)res.result);
}
return res;
}
@@ -572,7 +575,7 @@
String str = null;
Result res = in.getNext(str);
if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
- res.result = CastUtils.stringToFloat((String)res.result);
+ res.result = CastUtils.stringToFloat((String)res.result);
}
return res;
}
@@ -695,7 +698,6 @@
}
case DataType.CHARARRAY: {
-
Result res = in.getNext(str);
return res;
@@ -988,4 +990,19 @@
return clone;
}
+ /**
+ * Get child expression of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ if (child == null) {
+ child = new ArrayList<ExpressionOperator>();
+ if (inputs.get(0) instanceof ExpressionOperator) {
+ child.add( (ExpressionOperator)inputs.get(0));
+ }
+ }
+
+ return child;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import java.util.List;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
@@ -32,49 +33,49 @@
import org.apache.pig.impl.plan.VisitorException;
public class POMapLookUp extends ExpressionOperator {
-
+
private static final long serialVersionUID = 1L;
private String key;
- public POMapLookUp(OperatorKey k) {
- super(k);
- }
-
- public POMapLookUp(OperatorKey k, int rp) {
- super(k, rp);
- }
-
- public POMapLookUp(OperatorKey k, int rp, String key) {
- super(k, rp);
- this.key = key;
- }
-
- public void setLookUpKey(String key) {
- this.key = key;
- }
-
- public String getLookUpKey() {
- return key;
- }
-
- @Override
- public void visit(PhyPlanVisitor v) throws VisitorException {
- v.visitMapLookUp(this);
-
- }
-
- @Override
- public String name() {
- return "POMapLookUp" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
- }
-
- @Override
- public boolean supportsMultipleInputs() {
- return false;
- }
-
- @Override
- public Result processInput() throws ExecException {
+ public POMapLookUp(OperatorKey k) {
+ super(k);
+ }
+
+ public POMapLookUp(OperatorKey k, int rp) {
+ super(k, rp);
+ }
+
+ public POMapLookUp(OperatorKey k, int rp, String key) {
+ super(k, rp);
+ this.key = key;
+ }
+
+ public void setLookUpKey(String key) {
+ this.key = key;
+ }
+
+ public String getLookUpKey() {
+ return key;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitMapLookUp(this);
+
+ }
+
+ @Override
+ public String name() {
+ return "POMapLookUp" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public Result processInput() throws ExecException {
Result res = new Result();
Map<String, Object> inpValue = null;
if (input == null && (inputs == null || inputs.size()==0)) {
@@ -91,65 +92,65 @@
return res;
}
}
-
- @SuppressWarnings("unchecked")
- private Result getNext() throws ExecException {
- Result res = processInput();
- if(res.result != null && res.returnStatus == POStatus.STATUS_OK) {
- res.result = ((Map<String, Object>)res.result).get(key);
- }
- return res;
- }
-
- @Override
- public Result getNext(Boolean b) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(DataBag db) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(DataByteArray ba) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Double d) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Float f) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Integer i) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Long l) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Map m) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(String s) throws ExecException {
- return getNext();
- }
-
- @Override
- public Result getNext(Tuple t) throws ExecException {
- return getNext();
- }
+
+ @SuppressWarnings("unchecked")
+ private Result getNext() throws ExecException {
+ Result res = processInput();
+ if(res.result != null && res.returnStatus == POStatus.STATUS_OK) {
+ res.result = ((Map<String, Object>)res.result).get(key);
+ }
+ return res;
+ }
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ return getNext();
+ }
@Override
public POMapLookUp clone() throws CloneNotSupportedException {
@@ -158,7 +159,12 @@
clone.cloneHelper(this);
return clone;
}
-
-
+
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
+
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java Thu Nov 12 18:33:15 2009
@@ -60,7 +60,7 @@
public Result getNext(Double d) throws ExecException {
Result res = expr.getNext(d);
if(res.returnStatus == POStatus.STATUS_OK && res.result!=null) {
- res.result = -1*((Double)res.result);
+ res.result = -1*((Double)res.result);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java Thu Nov 12 18:33:15 2009
@@ -58,6 +58,11 @@
@Override
public Result getNext(Boolean b) throws ExecException {
+ Result r = accumChild(null, dummyBool);
+ if (r != null) {
+ return r;
+ }
+
Result left;
left = lhs.getNext(dummyBool);
// pass on ERROR and EOP
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Thu Nov 12 18:33:15 2009
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.pig.PigException;
@@ -52,12 +53,12 @@
*/
private static final long serialVersionUID = 1L;
- private static TupleFactory tupleFactory = TupleFactory.getInstance();
-
- protected static final BagFactory bagFactory = BagFactory.getInstance();
+ private static TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ protected static final BagFactory bagFactory = BagFactory.getInstance();
- private boolean resultSingleTupleBag = false;
-
+ private boolean resultSingleTupleBag = false;
+
//The column to project
protected ArrayList<Integer> columns;
@@ -162,7 +163,7 @@
ret = null;
}
} else {
- ArrayList<Object> objList = new ArrayList<Object>(columns.size());
+ ArrayList<Object> objList = new ArrayList<Object>(columns.size());
for(int i: columns) {
try {
@@ -175,7 +176,7 @@
objList.add(null);
}
}
- ret = tupleFactory.newTuple(objList);
+ ret = tupleFactory.newTuple(objList);
}
res.result = ret;
return res;
@@ -294,12 +295,12 @@
if(columns.size() == 1) {
ret = inpValue.get(columns.get(0));
} else {
- ArrayList<Object> objList = new ArrayList<Object>(columns.size());
+ ArrayList<Object> objList = new ArrayList<Object>(columns.size());
for(int i: columns) {
objList.add(inpValue.get(i));
}
- ret = tupleFactory.newTuple(objList);
+ ret = tupleFactory.newTuple(objList);
res.result = (Tuple)ret;
return res;
}
@@ -413,4 +414,9 @@
this.resultSingleTupleBag = resultSingleTupleBag;
}
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
+
}