You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by gd...@apache.org on 2012/09/13 16:55:38 UTC
svn commit: r1384352 [1/4] - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Author: gdfm
Date: Thu Sep 13 14:55:36 2012
New Revision: 1384352
URL: http://svn.apache.org/viewvc?rev=1384352&view=rev
Log:
PIG-2353: RANK function like in SQL (xalan via azaroth)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java
pig/trunk/test/org/apache/pig/test/TestOrderBy3.java
pig/trunk/test/org/apache/pig/test/TestRank1.java
pig/trunk/test/org/apache/pig/test/TestRank2.java
pig/trunk/test/org/apache/pig/test/TestRank3.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
pig/trunk/src/org/apache/pig/parser/AliasMasker.g
pig/trunk/src/org/apache/pig/parser/AstPrinter.g
pig/trunk/src/org/apache/pig/parser/AstValidator.g
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
pig/trunk/src/org/apache/pig/parser/QueryLexer.g
pig/trunk/src/org/apache/pig/parser/QueryParser.g
pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm
pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/e2e/pig/tools/generate/generate_data.pl
pig/trunk/test/org/apache/pig/parser/TestLexer.pig
pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
pig/trunk/test/org/apache/pig/parser/TestParser.pig
pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java
pig/trunk/test/org/apache/pig/parser/TestQueryParser.java
pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java
pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 13 14:55:36 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-2353: RANK function like in SQL (xalan via azaroth)
+
PIG-2915: Builtin TOP udf is sensitive to null input bags (hazen via dvryaboy)
PIG-2901: Errors and lacks in document "Pig Latin Basics" (miyakawataku via billgraham)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Sep 13 14:55:36 2012
@@ -635,4 +635,8 @@ public class AvroStorage extends FileInp
}
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException{
+
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Sep 13 14:55:36 2012
@@ -25,13 +25,17 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +46,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -62,11 +70,13 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
@@ -137,6 +147,11 @@ public class JobControlCompiler{
private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
private static final String REDUCER_ESTIMATOR_ARG_KEY = "pig.exec.reducer.estimator.arg";
+ public static final String PIG_MAP_COUNTER = "pig.counters.counter_";
+ public static final String PIG_MAP_RANK_NAME = "pig.rank_";
+ public static final String PIG_MAP_SEPARATOR = "_";
+ public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new HashMap<String, ArrayList<Pair<String,Long>>>();
+
/**
* We will serialize the POStore(s) present in map and reduce in lists in
* the Hadoop Conf. In the case of Multi stores, we could deduce these from
@@ -152,6 +167,7 @@ public class JobControlCompiler{
private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
private Map<Job, MapReduceOper> jobMroMap;
+ private int counterSize;
public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
this.pigContext = pigContext;
@@ -314,6 +330,8 @@ public class JobControlCompiler{
if (!completeFailedJobs.contains(job))
{
MapReduceOper mro = jobMroMap.get(job);
+ if (mro.isCounterOperation() /*&& completeFailedJobs.size() > 0*/)
+ saveCounters(job,mro.getOperationID());
plan.remove(mro);
}
}
@@ -323,6 +341,64 @@ public class JobControlCompiler{
}
/**
+ * Reads the global counters produced by a job on the group labeled with PIG_MAP_RANK_NAME.
+ * Then, it is calculated the cumulative sum, which consists on the sum of previous cumulative
+ * sum plus the previous global counter value.
+ * @param job with the global counters collected.
+ * @param operationID After being collected on global counters (POCounter),
+ * these values are passed via configuration file to PORank, by using the unique
+ * operation identifier
+ */
+ private void saveCounters(Job job, String operationID) {
+ JobClient jobClient;
+ Counters counters;
+ Group groupCounters;
+
+ Long previousValue = 0L;
+ Long previousSum = 0L;
+ ArrayList<Pair<String,Long>> counterPairs;
+
+ try {
+ jobClient = job.getJobClient();
+ counters = jobClient.getJob(job.getAssignedJobID()).getCounters();
+ groupCounters = counters.getGroup(getGroupName(counters.getGroupNames()));
+
+ Iterator<Counter> it = groupCounters.iterator();
+ HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
+
+ while(it.hasNext()) {
+ try{
+ Counter c = it.next();
+ counterList.put(Integer.valueOf(c.getDisplayName()), c.getValue());
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ counterSize = counterList.size();
+ counterPairs = new ArrayList<Pair<String,Long>>();
+
+ for(int i = 0; i < counterSize; i++){
+ previousSum += previousValue;
+ previousValue = counterList.get(Integer.valueOf(i));
+ counterPairs.add(new Pair<String, Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + JobControlCompiler.PIG_MAP_SEPARATOR + i, previousSum));
+ }
+
+ globalCounters.put(operationID, counterPairs);
+
+ } catch (Exception e) {
+ String msg = "Error to read counters into Rank operation counterSize "+counterSize;
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private String getGroupName(Collection<String> collection) {
+ for (String name : collection) {
+ if (name.contains(PIG_MAP_RANK_NAME))
+ return name;
+ }
+ return null;
+ }
+ /**
* The method that creates the Job corresponding to a MapReduceOper.
* The assumption is that
* every MapReduceOper will have a load and a store. The JobConf removes
@@ -704,6 +780,28 @@ public class JobControlCompiler{
nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
}
+ if (mro.isCounterOperation()) {
+ if (mro.isRowNumber()) {
+ nwJob.setMapperClass(PigMapReduceCounter.PigMapCounter.class);
+ } else {
+ nwJob.setReducerClass(PigMapReduceCounter.PigReduceCounter.class);
+ }
+ }
+
+ if(mro.isRankOperation()) {
+ Iterator<String> operationIDs = mro.getRankOperationId().iterator();
+
+ while(operationIDs.hasNext()) {
+ String operationID = operationIDs.next();
+ Iterator<Pair<String, Long>> itPairs = globalCounters.get(operationID).iterator();
+ Pair<String,Long> pair = null;
+ while(itPairs.hasNext()) {
+ pair = itPairs.next();
+ conf.setLong(pair.first, pair.second);
+ }
+ }
+ }
+
if (!pigContext.inIllustrator)
{
// unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Sep 13 14:55:36 2012
@@ -58,6 +58,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -75,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -1982,12 +1984,77 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
+ /**
+ * For the counter job, it depends if it is row number or not.
+ * In case of being a row number, any previous jobs are saved
+ * and POCounter is added as a leaf on a map task.
+ * If it is not, then POCounter is added as a leaf on a reduce
+ * task (last sorting phase).
+ **/
+ @Override
+ public void visitCounter(POCounter op) throws VisitorException {
+ try{
+ if(op.isRowNumber()) {
+ List<PhysicalOperator> mpLeaves = curMROp.mapPlan.getLeaves();
+ PhysicalOperator leaf = mpLeaves.get(0);
+ if ( !curMROp.isMapDone() && !curMROp.isRankOperation() )
+ {
+ curMROp.setIsCounterOperation(true);
+ curMROp.setIsRowNumber(true);
+ curMROp.setOperationID(op.getOperationID());
+ curMROp.mapPlan.addAsLeaf(op);
+ } else {
+ FileSpec fSpec = getTempFileSpec();
+ MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+ MapReduceOper mrCounter = startNew(fSpec, prevMROper);
+ mrCounter.mapPlan.addAsLeaf(op);
+ mrCounter.setIsCounterOperation(true);
+ mrCounter.setIsRowNumber(true);
+ mrCounter.setOperationID(op.getOperationID());
+ curMROp = mrCounter;
+ }
+ } else {
+ curMROp.setIsCounterOperation(true);
+ curMROp.setIsRowNumber(false);
+ curMROp.setOperationID(op.getOperationID());
+ curMROp.reducePlan.addAsLeaf(op);
+ }
+
+ phyToMROpMap.put(op, curMROp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ /**
+ * In case of PORank, it is closed any other previous job (containing
+ * POCounter as a leaf) and PORank is added on map phase.
+ **/
+ @Override
+ public void visitRank(PORank op) throws VisitorException {
+ try{
+ FileSpec fSpec = getTempFileSpec();
+ MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+
+ curMROp = startNew(fSpec, prevMROper);
+ curMROp.setOperationID(op.getOperationID());
+ curMROp.mapPlan.addAsLeaf(op);
+
+ phyToMROpMap.put(op, curMROp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
private Pair<POProject,Byte> [] getSortCols(List<PhysicalPlan> plans) throws PlanException, ExecException {
if(plans!=null){
@SuppressWarnings("unchecked")
- Pair<POProject,Byte>[] ret = new Pair[plans.size()];
+ Pair<POProject,Byte>[] ret = new Pair[plans.size()];
int i=-1;
for (PhysicalPlan plan : plans) {
PhysicalOperator op = plan.getLeaves().get(0);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Sep 13 14:55:36 2012
@@ -18,7 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import org.apache.pig.impl.plan.OperatorKey;
@@ -26,6 +29,7 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.PlanException;
@@ -133,11 +137,19 @@ public class MapReduceOper extends Opera
// POLimit can also have an expression. See PIG-1926
PhysicalPlan limitPlan = null;
-
- // Indicates that this MROper is a splitter MROper.
+
+ // Indicates that this MROper is a splitter MROper.
// That is, this MROper ends due to a POSPlit operator.
private boolean splitter = false;
+ // Indicates that there is a counter operation in the MR job.
+ private boolean isCounterOperation = false;
+
+ // Indicates that there is a rank operation without sorting (row number) in the MR job.
+ private boolean isRowNumber = false;
+
+ private String operationID;
+
// Set to true if it is skewed join
private boolean skewedJoin = false;
@@ -478,12 +490,53 @@ public class MapReduceOper extends Opera
protected void useTypedComparator(boolean useTypedComparator) {
this.usingTypedComparator = useTypedComparator;
}
-
+
protected void noCombineSmallSplits() {
combineSmallSplits = false;
}
-
+
public boolean combineSmallSplits() {
return combineSmallSplits;
}
+
+ public void setIsCounterOperation(boolean counter) {
+ this.isCounterOperation = counter;
+ }
+
+ public boolean isCounterOperation() {
+ return isCounterOperation;
+ }
+
+ public boolean isRankOperation() {
+ return getRankOperationId().size() != 0;
+ }
+
+ public ArrayList<String> getRankOperationId() {
+ ArrayList<String> operationIDs = new ArrayList<String>();
+ Iterator<PhysicalOperator> mapRoots = this.mapPlan.getRoots().iterator();
+
+ while(mapRoots.hasNext()) {
+ PhysicalOperator operation = mapRoots.next();
+ if(operation instanceof PORank)
+ operationIDs.add(((PORank) operation).getOperationID());
+ }
+
+ return operationIDs;
+ }
+
+ public void setIsRowNumber(boolean isRowNumber) {
+ this.isRowNumber = isRowNumber;
+ }
+
+ public boolean isRowNumber() {
+ return isRowNumber;
+ }
+
+ public void setOperationID(String operationID) {
+ this.operationID = operationID;
+ }
+
+ public String getOperationID() {
+ return operationID;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Sep 13 14:55:36 2012
@@ -44,23 +44,23 @@ public class PhyPlanSetter extends PhyPl
public void visitLoad(POLoad ld) throws VisitorException{
ld.setParentPlan(parent);
}
-
+
@Override
public void visitNative(PONative nt) throws VisitorException{
nt.setParentPlan(parent);
}
-
+
@Override
public void visitStore(POStore st) throws VisitorException{
st.setParentPlan(parent);
}
-
+
@Override
public void visitFilter(POFilter fl) throws VisitorException{
super.visitFilter(fl);
fl.setParentPlan(parent);
}
-
+
@Override
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
super.visitLocalRearrange(lr);
@@ -77,28 +77,28 @@ public class PhyPlanSetter extends PhyPl
public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
gr.setParentPlan(parent);
}
-
+
@Override
public void visitPackage(POPackage pkg) throws VisitorException{
pkg.setParentPlan(parent);
}
-
+
@Override
public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
pkg.setParentPlan(parent);
}
-
+
@Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
super.visitPOForEach(nfe);
nfe.setParentPlan(parent);
}
-
+
@Override
public void visitUnion(POUnion un) throws VisitorException{
un.setParentPlan(parent);
}
-
+
@Override
public void visitSplit(POSplit spl) throws VisitorException{
PhysicalPlan oldPlan = parent;
@@ -112,7 +112,7 @@ public class PhyPlanSetter extends PhyPl
parent=oldPlan;
spl.setParentPlan(parent);
}
-
+
@Override
public void visitDemux(PODemux demux) throws VisitorException{
super.visitDemux(demux);
@@ -121,55 +121,60 @@ public class PhyPlanSetter extends PhyPl
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException {
- distinct.setParentPlan(parent);
+ distinct.setParentPlan(parent);
}
-
+
@Override
public void visitSort(POSort sort) throws VisitorException {
super.visitSort(sort);
sort.setParentPlan(parent);
}
-
+
+ @Override
+ public void visitRank(PORank rank) throws VisitorException {
+
+ }
+
@Override
public void visitConstant(ConstantExpression cnst) throws VisitorException{
cnst.setParentPlan(parent);
}
-
+
@Override
public void visitProject(POProject proj) throws VisitorException{
proj.setParentPlan(parent);
}
-
+
@Override
public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
grt.setParentPlan(parent);
}
-
+
@Override
public void visitLessThan(LessThanExpr lt) throws VisitorException{
lt.setParentPlan(parent);
}
-
+
@Override
public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
gte.setParentPlan(parent);
}
-
+
@Override
public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
lte.setParentPlan(parent);
}
-
+
@Override
public void visitEqualTo(EqualToExpr eq) throws VisitorException{
eq.setParentPlan(parent);
}
-
+
@Override
public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
eq.setParentPlan(parent);
}
-
+
@Override
public void visitRegexp(PORegexp re) throws VisitorException{
re.setParentPlan(parent);
@@ -179,32 +184,32 @@ public class PhyPlanSetter extends PhyPl
public void visitIsNull(POIsNull isNull) throws VisitorException {
isNull.setParentPlan(parent);
}
-
+
@Override
public void visitAdd(Add add) throws VisitorException{
add.setParentPlan(parent);
}
-
+
@Override
public void visitSubtract(Subtract sub) throws VisitorException {
sub.setParentPlan(parent);
}
-
+
@Override
public void visitMultiply(Multiply mul) throws VisitorException {
mul.setParentPlan(parent);
}
-
+
@Override
public void visitDivide(Divide dv) throws VisitorException {
dv.setParentPlan(parent);
}
-
+
@Override
public void visitMod(Mod mod) throws VisitorException {
mod.setParentPlan(parent);
}
-
+
@Override
public void visitAnd(POAnd and) throws VisitorException {
and.setParentPlan(parent);
@@ -229,12 +234,12 @@ public class PhyPlanSetter extends PhyPl
public void visitNegative(PONegative negative) {
negative.setParentPlan(parent);
}
-
+
@Override
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
userFunc.setParentPlan(parent);
}
-
+
@Override
public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
compFunc.setParentPlan(parent);
@@ -244,7 +249,7 @@ public class PhyPlanSetter extends PhyPl
public void visitMapLookUp(POMapLookUp mapLookUp) {
mapLookUp.setParentPlan(parent);
}
-
+
@Override
public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
joinPackage.setParentPlan(parent);
@@ -254,17 +259,17 @@ public class PhyPlanSetter extends PhyPl
public void visitCast(POCast cast) {
cast.setParentPlan(parent);
}
-
+
@Override
public void visitLimit(POLimit lim) throws VisitorException{
lim.setParentPlan(parent);
}
-
+
@Override
public void visitFRJoin(POFRJoin join) throws VisitorException {
join.setParentPlan(parent);
}
-
+
@Override
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
join.setParentPlan(parent);
@@ -280,13 +285,13 @@ public class PhyPlanSetter extends PhyPl
stream.setParentPlan(parent);
}
-/*
+ /*
@Override
public void visitPartitionRearrange(POPartitionRearrange lrfi) throws VisitorException {
super.visitPartitionRearrange(lrfi);
lrfi.setParentPlan(parent);
}
-*/
+ */
@Override
public void visitPartialAgg(POPartialAgg poPartialAgg) {
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly.Map;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+public class PigMapReduceCounter {
+
+ /**
+ * This class is the used only for simple RANK operation, namely row number mode.
+ **/
+ public static class PigMapCounter extends PigMapBase {
+
+ private static final Log log = LogFactory.getLog(PigMapCounter.class);
+ public static String taskID;
+ public static Context context;
+ private PhysicalOperator pOperator;
+
+ /**
+ * Here is set up the task id, in order to be attached to each tuple
+ **/
+ public void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+
+ pOperator = mp.getLeaves().get(0);
+
+ while(true) {
+ if(pOperator instanceof POCounter){
+ ((POCounter) pOperator).setTaskId(taskID);
+ ((POCounter) pOperator).resetLocalCounter();
+ break;
+ } else {
+ pOperator = mp.getPredecessors(pOperator).get(0);
+ }
+ }
+ }
+
+ /**
+ * While tuples are collected, they are counted one by one by a global counter per task.
+ **/
+ @Override
+ public void collect(Context context, Tuple tuple)
+ throws InterruptedException, IOException {
+ context.write(null, tuple);
+ try {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.getCounter(
+ JobControlCompiler.PIG_MAP_RANK_NAME
+ + context.getJobID().toString(), taskID)
+ .increment(1);
+ }
+ } catch (Exception ex) {
+ log.error("Error on incrementer of PigMapCounter");
+ }
+ }
+ }
+
+ /**
+ * This class is the used for RANK BY operations, independently if it is dense or not.
+ **/
+ public static class PigReduceCounter extends PigMapReduce.Reduce {
+
+ private static final Log log = LogFactory.getLog(PigReduceCounter.class);
+ public static String taskID;
+ public static Context context;
+ public static List<PhysicalOperator> leaves;
+ public static PhysicalOperator leaf;
+
+ /**
+ * Here is set up the task id, in order to be attached to each tuple
+ **/
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+
+ leaf = rp.getLeaves().get(0);
+
+ while(true) {
+ if(leaf instanceof POCounter){
+ ((POCounter) leaf).setTaskId(taskID);
+ ((POCounter) leaf).resetLocalCounter();
+ break;
+ } else {
+ leaf = rp.getPredecessors(leaf).get(0);
+ }
+ }
+
+ this.context = context;
+ }
+
+ /**
+ * On this case, global counters are accessed during reduce phase (immediately after a
+ * sorting phase) and the increment for global counters are dependent if it is dense rank
+ * or not.
+ * If it is a dense rank, increment is done by 1. if it is not increment depends on the size
+ * of the size of bag in the tuple.
+ * @param increment is the value to add to the corresponding global counter.
+ **/
+ public static void incrementCounter(Long increment) {
+ try {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+
+ if(leaf instanceof POCounter){
+ reporter.getCounter(
+ JobControlCompiler.PIG_MAP_RANK_NAME
+ + context.getJobID().toString(), taskID).increment(increment);
+
+ }
+
+ }
+ } catch (Exception ex) {
+ log.error("Error on incrementer of PigReduceCounter");
+ }
+
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java Thu Sep 13 14:55:36 2012
@@ -115,7 +115,13 @@ public class DotPOPrinter extends DotPla
plans.addAll(((POForEach)op).getInputPlans());
}
else if(op instanceof POSort){
- plans.addAll(((POSort)op).getSortPlans());
+ plans.addAll(((POSort)op).getSortPlans());
+ }
+ else if(op instanceof PORank){
+ plans.addAll(((PORank)op).getRankPlans());
+ }
+ else if(op instanceof POCounter){
+ plans.addAll(((POCounter)op).getCounterPlans());
}
else if(op instanceof POLocalRearrange){
plans.addAll(((POLocalRearrange)op).getPlans());
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Sep 13 14:55:36 2012
@@ -127,9 +127,17 @@ public class PhyPlanVisitor extends Plan
popWalker();
}
}
-
+
+ public void visitCounter(POCounter poCounter) throws VisitorException {
+ //do nothing
+ }
+
+ public void visitRank(PORank rank) throws VisitorException {
+ //do nothing
+ }
+
public void visitDistinct(PODistinct distinct) throws VisitorException {
- //do nothing
+ //do nothing
}
public void visitSort(POSort sort) throws VisitorException {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Sep 13 14:55:36 2012
@@ -163,8 +163,14 @@ public class PlanPrinter<O extends Opera
else if(node instanceof POCollectedGroup){
sb.append(planString(((POCollectedGroup)node).getPlans()));
}
+ else if(node instanceof PORank){
+ sb.append(planString(((PORank)node).getRankPlans()));
+ }
+ else if(node instanceof POCounter){
+ sb.append(planString(((POCounter)node).getCounterPlans()));
+ }
else if(node instanceof POSort){
- sb.append(planString(((POSort)node).getSortPlans()));
+ sb.append(planString(((POSort)node).getSortPlans()));
}
else if(node instanceof POForEach){
sb.append(planString(((POForEach)node).getInputPlans()));
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduceCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+/**
+ * This operator is part of the RANK operator implementation.
+ * It adds a local counter and a unique task id to each tuple.
+ * There are 2 modes of operations: regular and dense.
+ * The local counter is depends on the mode of operation.
+ * With regular rank is considered duplicate rows while assigning
+ * numbers to distinct values groups.
+ * With dense rank counts the number of distinct values, without
+ * considering duplicate rows. Depending on if it is considered.
+ * the entire tuple (row number) or a by a set of columns (rank by).
+ *
+ * This Physical Operator relies on some specific MR class,
+ * available at PigMapReduceCounter.
+ **/
+
+public class POCounter extends PhysicalOperator {
+
+ private static final long serialVersionUID = 1L;
+ private static final Long ONE = 1L;
+
+ private List<PhysicalPlan> counterPlans;
+ private List<Boolean> mAscCols;
+
+ /**
+ * In case of RANK BY, it could by dense or not.
+ * Being a dense rank means to assign consecutive ranks
+ * to different values.
+ **/
+ private boolean isDenseRank = false;
+
+ /**
+ * In case of simple RANK, namely row number mode
+ * which is a consecutive number assigned to each tuple.
+ **/
+ private boolean isRowNumber = false;
+
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ /**
+ * Local counter for tuples on the same task.
+ **/
+ private Long localCount = 1L;
+
+ /**
+ * Task ID to label each tuple analyzed by the corresponding task
+ **/
+ private String taskID = "-1";
+
+ /**
+ * Unique identifier that links POCounter and PORank,
+ * through the global counter labeled with it.
+ **/
+ private String operationID;
+
+ public POCounter(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POCounter(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POCounter(OperatorKey k, List<PhysicalOperator> inputs) {
+ this(k, -1, inputs);
+ }
+
+ public POCounter(OperatorKey k, int rp, List<PhysicalOperator> inputs) {
+ super(k, rp, inputs);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public POCounter(OperatorKey operatorKey, int requestedParallelism,
+ List inp, List<PhysicalPlan> counterPlans,
+ List<Boolean> ascendingCol) {
+ super(operatorKey, requestedParallelism, inp);
+ this.setCounterPlans(counterPlans);
+ this.setAscendingColumns(ascendingCol);
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null){
+ return new ExampleTuple((Tuple)out);
+ }
+ return (Tuple) out;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitCounter(this);
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result inp = null;
+
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP
+ || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ return addCounterValue(inp);
+ }
+ return inp;
+ }
+
+ /**
+ * Add current task id and local counter value.
+ * @param input from the previous output
+ * @return a tuple within two values prepended to the tuple
+ * the task identifier and the local counter value.
+ * Local counter value could be incremented by one (is a row number or dense rank)
+ * or, could be incremented by the size of the bag on the previous tuple processed
+ **/
+ protected Result addCounterValue(Result input) throws ExecException {
+ Tuple in = (Tuple) input.result;
+ Tuple out = mTupleFactory.newTuple(in.getAll().size() + 2);
+ Long sizeBag = 0L;
+ int positionBag, i = 2;
+
+ // Tuples are added by two stamps before the tuple content:
+ // 1.- At position 0: Current taskId
+ out.set(0, getTaskId());
+
+ // 2.- At position 1: counter value
+ //On this case, each tuple is analyzed independently of the tuples grouped
+ if(isRowNumber() || isDenseRank()) {
+
+ //Only when is Dense Rank (attached to a reduce phase) it is incremented on this way
+ //Otherwise, the increment is done at mapper automatically
+ if(isDenseRank())
+ PigMapReduceCounter.PigReduceCounter.incrementCounter(POCounter.ONE);
+
+ out.set(1, getLocalCounter());
+
+ //and the local incrementer is sequentially increased.
+ incrementLocalCounter();
+
+ } else if(!isDenseRank()) {
+ //Standard rank: On this case is important the
+ //number of tuples on the same group.
+ positionBag = in.getAll().size()-1;
+ if (in.getType(positionBag) == DataType.BAG) {
+ sizeBag = ((org.apache.pig.data.DefaultAbstractBag)in.get(positionBag)).size();
+ }
+
+ //This value (the size of the tuples on the bag) is used to increment
+ //the current global counter and
+ PigMapReduceCounter.PigReduceCounter.incrementCounter(sizeBag);
+
+ out.set(1, getLocalCounter());
+
+ //the value for the next tuple on the current task
+ addToLocalCounter(sizeBag);
+
+ }
+
+ for (Object o : in) {
+ out.set(i++, o);
+ }
+
+ input.result = illustratorMarkup(in, out, 0);
+
+ return input;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "POCounter" + "["
+ + DataType.findTypeName(resultType) + "]" + " - "
+ + mKey.toString();
+ }
+
+ public void setCounterPlans(List<PhysicalPlan> counterPlans) {
+ this.counterPlans = counterPlans;
+ }
+
+ public List<PhysicalPlan> getCounterPlans() {
+ return counterPlans;
+ }
+
+ public void setAscendingColumns(List<Boolean> mAscCols) {
+ this.mAscCols = mAscCols;
+ }
+
+ public List<Boolean> getAscendingColumns() {
+ return mAscCols;
+ }
+
+ /**
+ * Initialization step into the POCounter is to set
+ * up local counter to 1.
+ **/
+ public void resetLocalCounter() {
+ this.localCount = 1L;
+ }
+
+ /**
+ * Sequential counter used at ROW NUMBER and RANK BY DENSE mode
+ **/
+ public Long incrementLocalCounter() {
+ return localCount++;
+ }
+
+ public void setLocalCounter(Long localCount) {
+ this.localCount = localCount;
+ }
+
+ public Long getLocalCounter() {
+ return this.localCount;
+ }
+
+ public void addToLocalCounter(Long sizeBag) {
+ this.localCount += sizeBag;
+ }
+
+ /**
+ * Task ID: identifier of the task (map or reducer)
+ **/
+ public void setTaskId(String taskID) {
+ this.taskID = taskID;
+ }
+
+ public String getTaskId() {
+ return this.taskID;
+ }
+
+ /**
+ * Dense Rank flag
+ **/
+ public void setIsDenseRank(boolean isDenseRank) {
+ this.isDenseRank = isDenseRank;
+ }
+
+ public boolean isDenseRank() {
+ return isDenseRank;
+ }
+
+ /**
+ * Row number flag
+ **/
+ public void setIsRowNumber(boolean isRowNumber) {
+ this.isRowNumber = isRowNumber;
+ }
+
+ public boolean isRowNumber() {
+ return isRowNumber;
+ }
+
+ /**
+ * Operation ID: identifier shared within the corresponding PORank
+ **/
+ public void setOperationID(String operationID) {
+ this.operationID = operationID;
+ }
+
+ public String getOperationID() {
+ return operationID;
+ }
+}
\ No newline at end of file
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+/**
+ * This operator is part of the RANK operator implementation.
+ * Reads the output tuple from POCounter and the cumulative sum previously calculated.
+ * Here is read the task identifier in order to get the corresponding cumulative sum,
+ * and the local counter at the tuple. These values are summed and prepended to the tuple.
+ **/
+
+public class PORank extends PhysicalOperator {
+
+ private static final Log log = LogFactory.getLog(PORank.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private List<PhysicalPlan> rankPlans;
+ private List<Boolean> mAscCols;
+ private List<Byte> ExprOutputTypes;
+
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ /**
+ * Unique identifier that links POCounter and PORank,
+ * through the global counter labeled with it.
+ **/
+ private String operationID;
+
+ /**
+ * Counter used to set tuples into the equivalence
+ * classes.
+ **/
+ private int localCountIllustrator = 0;
+
+ public PORank(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public PORank(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public PORank(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ public PORank(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public PORank(OperatorKey operatorKey, int requestedParallelism, List inp,
+ List<PhysicalPlan> rankPlans, List<Boolean> ascendingCol) {
+ super(operatorKey, requestedParallelism, inp);
+ this.setRankPlans(rankPlans);
+ this.setAscendingCols(ascendingCol);
+
+ ExprOutputTypes = new ArrayList<Byte>(rankPlans.size());
+
+ for (PhysicalPlan plan : rankPlans) {
+ ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
+ }
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null){
+ ExampleTuple tOut = new ExampleTuple((Tuple)out);
+ illustrator.addData((Tuple)out);
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple)in);
+
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ return tOut;
+ }
+ return (Tuple) out;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitRank(this);
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result inp = null;
+
+ while (true) {
+ inp = processInput();
+
+ if (inp.returnStatus == POStatus.STATUS_EOP
+ || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+ return addRank(inp);
+ }
+
+ return inp;
+ }
+
+ /**
+ * Reads the output tuple from POCounter and the cumulative sum previously calculated.
+ * Here is read the task identifier in order to get the corresponding cumulative sum,
+ * and the local counter at the tuple. These values are summed and prepended to the tuple.
+ * @param input processed by POCounter
+ * @result tuple with the prepend rank value.
+ **/
+ public Result addRank(Result input) throws ExecException {
+ int i = 1;
+ Tuple in = (Tuple) input.result;
+ Tuple out = mTupleFactory.newTuple(in.getAll().size() - 1);
+
+ Long taskId = Long.valueOf(in.get(0).toString());
+ Long localCounter = (Long) in.get(1);
+
+ String nameCounter = JobControlCompiler.PIG_MAP_COUNTER + getOperationID() + JobControlCompiler.PIG_MAP_SEPARATOR + String.valueOf(taskId);
+
+ Long rank = PigMapReduce.sJobConfInternal.get().getLong( nameCounter , -1L );
+
+ if(rank == -1) {
+ log.error("Error on reading counter "+ nameCounter);
+ throw new RuntimeException("Unable to read counter "+ nameCounter);
+ }
+
+ out.set(0, rank + localCounter);
+
+ //Add the content of the tuple
+ List<Object> sub = in.getAll().subList(2, in.getAll().size());
+
+ for (Object o : sub)
+ out.set(i++, o);
+
+ if(localCountIllustrator > 2)
+ localCountIllustrator = 0;
+
+ input.result = illustratorMarkup(in, out, localCountIllustrator);
+
+ localCountIllustrator++;
+
+ return input;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "PORank" + "["
+ + DataType.findTypeName(resultType) + "]" + " - "
+ + mKey.toString();
+ }
+
+ public void setRankPlans(List<PhysicalPlan> rankPlans) {
+ this.rankPlans = rankPlans;
+ }
+
+ public List<PhysicalPlan> getRankPlans() {
+ return rankPlans;
+ }
+
+ public void setAscendingCols(List<Boolean> mAscCols) {
+ this.mAscCols = mAscCols;
+ }
+
+ public List<Boolean> getAscendingCols() {
+ return mAscCols;
+ }
+
+ /**
+ * Operation ID: identifier shared within the corresponding POCounter
+ * @param operationID
+ **/
+ public void setOperationID(String operationID) {
+ this.operationID = operationID;
+ }
+
+ public String getOperationID() {
+ return operationID;
+ }
+}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java Thu Sep 13 14:55:36 2012
@@ -34,6 +34,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
@@ -147,7 +148,13 @@ public abstract class AllExpressionVisit
LogicalExpressionVisitor v = getVisitor(splitOutput.getFilterPlan());
v.visit();
}
-
+
+ @Override
+ public void visit(LORank rank) throws FrontendException{
+ currentOp = rank;
+ visitAll(rank.getRankColPlans());
+ }
+
@Override
public void visit(LOSort sort) throws FrontendException {
currentOp = sort;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java Thu Sep 13 14:55:36 2012
@@ -28,6 +28,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -52,13 +53,13 @@ public abstract class AllSameRalationalN
public AllSameRalationalNodesVisitor(OperatorPlan plan, PlanWalker walker) throws FrontendException {
super(plan, walker);
}
-
+
/**
* Method to call on every node in the logical plan.
* @param op Node that is currently being visited.
*/
abstract protected void execute(LogicalRelationalOperator op) throws FrontendException;
-
+
@Override
public void visit(LOFilter filter) throws FrontendException {
execute(filter);
@@ -78,47 +79,52 @@ public abstract class AllSameRalationalN
public void visit(LOLoad load) throws FrontendException {
execute(load);
}
-
+
@Override
public void visit(LOStore store) throws FrontendException {
execute(store);
}
-
+
@Override
public void visit(LOForEach foreach) throws FrontendException {
execute(foreach);
}
-
+
@Override
public void visit(LOSplit split) throws FrontendException {
execute(split);
}
-
+
@Override
public void visit(LOSplitOutput splitOutput) throws FrontendException {
execute(splitOutput);
}
-
+
@Override
public void visit(LOUnion union) throws FrontendException {
execute(union);
}
-
+
@Override
public void visit(LOSort sort) throws FrontendException {
execute(sort);
}
-
+
+ @Override
+ public void visit(LORank rank) throws FrontendException {
+ execute(rank);
+ }
+
@Override
public void visit(LODistinct distinct) throws FrontendException {
execute(distinct);
}
-
+
@Override
public void visit(LOCross cross) throws FrontendException {
execute(cross);
}
-
+
@Override
public void visit(LOStream stream) throws FrontendException {
execute(stream);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Thu Sep 13 14:55:36 2012
@@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -190,6 +191,11 @@ public class LogicalPlanPrinter extends
}
}
}
+ else if(node instanceof LORank){
+ // Visit fields for rank
+ for (OperatorPlan plan : ((LORank)node).getRankColPlans())
+ sb.append(planString(plan));
+ }
else if(node instanceof LOSort){
for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
sb.append(planString(plan));
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Thu Sep 13 14:55:36 2012
@@ -44,6 +44,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -169,13 +170,20 @@ public class SchemaResetter extends Logi
visitAll(loSort.getSortColPlans());
validate(loSort.getSchema());
}
-
+
+ @Override
+ public void visit(LORank loRank) throws FrontendException{
+ loRank.resetSchema();
+ visitAll(loRank.getRankColPlans());
+ validate(loRank.getSchema());
+ }
+
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
loDistinct.resetSchema();
validate(loDistinct.getSchema());
}
-
+
@Override
public void visit(LOLimit loLimit) throws FrontendException {
loLimit.resetSchema();
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java Thu Sep 13 14:55:36 2012
@@ -40,6 +40,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -143,12 +144,22 @@ public class UidResetter extends Logical
uidResetter.visit();
}
}
-
+
+ @Override
+ public void visit(LORank loRank) throws FrontendException {
+ loRank.resetUid();
+ List<LogicalExpressionPlan> rankPlans = loRank.getRankColPlans();
+ for (LogicalExpressionPlan rankPlan : rankPlans) {
+ ExpressionUidResetter uidResetter = new ExpressionUidResetter(rankPlan);
+ uidResetter.visit();
+ }
+ }
+
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
loDistinct.resetUid();
}
-
+
@Override
public void visit(LOLimit loLimit) throws FrontendException {
loLimit.resetUid();
Added: pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+
+/**
+ * RANK operator implementation.
+ * Operator Syntax:
+ * <pre>
+ * {@code alias = RANK rel ( BY (col_ref) (ASC|DESC)? ( DENSE )? )?;}
+ * alias - output alias
+ * RANK - operator
+ * rel - input relation
+ * BY - operator
+ * col_ref - STAR or Column References or a range in the schema of rel
+ * DENSE - dense rank means a sequential value without gasp among different tuple values.
+ * </pre>
+ */
+
+public class LORank extends LogicalRelationalOperator{
+
+ private final static String RANK_COL_NAME = "rank";
+ private final static String SEPARATOR = "_";
+
+ /**
+ * A List within logical expression plans in case of RANK BY
+ */
+ private List<LogicalExpressionPlan> rankColPlans;
+
+ /**
+ * A List within ascending columns on a RANK BY
+ */
+ private List<Boolean> ascCols;
+
+ /**
+ * In case of RANK BY, it could by dense or not.
+ * Being a dense rank means to assign consecutive ranking
+ * to different tuples.
+ */
+ private boolean isDenseRank = false;
+
+ /**
+ * In case of simple RANK, namely row number mode
+ * which is a consecutive number assigned to each tuple.
+ */
+ private boolean isRowNumber = false;
+
+ public LORank( OperatorPlan plan) {
+ super("LORank", plan);
+ }
+
+ public LORank( OperatorPlan plan, List<LogicalExpressionPlan> rankColPlans, List<Boolean> ascCols) {
+ this( plan );
+ this.rankColPlans = rankColPlans;
+ this.ascCols = ascCols;
+ }
+
+ public List<LogicalExpressionPlan> getRankColPlans() {
+ return rankColPlans;
+ }
+
+ public void setRankColPlan(List<LogicalExpressionPlan> rankColPlans) {
+ this.rankColPlans = rankColPlans;
+ }
+
+ public List<Boolean> getAscendingCol() {
+ return ascCols;
+ }
+
+ public void setAscendingCol(List<Boolean> ascCols) {
+ this.ascCols = ascCols;
+ }
+
+ /**
+ * Get the schema for the output of LORank.
+ * Composed by long value prepended to the
+ * rest of the input schema
+ * @return the schema
+ * @throws FrontendException
+ */
+ @Override
+ public LogicalSchema getSchema() throws FrontendException {
+
+ // if schema is calculated before, just return
+ if (schema != null) {
+ return schema;
+ }
+
+ LogicalRelationalOperator input = null;
+
+ //Same schema of previous predecessor
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ if (input == null) {
+ return null;
+ }
+
+ LogicalSchema inputSchema = input.getSchema();
+
+ // the schema of one input is unknown, so the rank schema is unknown, just return
+ if (inputSchema == null) {
+ schema = null;
+ return schema;
+ }
+
+ //Complete copy from previous schema for each LogicalFieldSchema
+ List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+
+ for (int i=0; i<inputSchema.size(); i++) {
+ LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
+ LogicalSchema.LogicalFieldSchema newFS = null;
+ newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
+ fss.add(newFS);
+ }
+
+ schema = new LogicalSchema();
+
+ schema.addField(new LogicalSchema.LogicalFieldSchema(RANK_COL_NAME+SEPARATOR+input.getAlias(), null, DataType.LONG));
+ schema.getField(0).uid = LogicalExpression.getNextUid();
+
+ for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
+ schema.addField(fieldSchema);
+ }
+
+ return schema;
+
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws FrontendException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new FrontendException("Expected LogicalPlanVisitor", 2223);
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) throws FrontendException {
+ if (other != null && other instanceof LORank) {
+ LORank oR = (LORank)other;
+ if (!rankColPlans.equals(oR.rankColPlans))
+ return false;
+ } else {
+ return false;
+ }
+
+ return checkEquality((LogicalRelationalOperator)other);
+ }
+
+ /**
+ * Get if it is a dense RANK BY
+ * @return boolean
+ */
+ public boolean isDenseRank() {
+ return isDenseRank;
+ }
+
+ /**
+ * Set if it is a dense RANK BY
+ * @param isDenseRank if is dense rank or not
+ */
+ public void setIsDenseRank(boolean isDenseRank) {
+ this.isDenseRank = isDenseRank;
+ }
+
+ /**
+ * Get if it is a simple RANK operation.
+ * Which means a row number attached to each tuple.
+ * @return boolean
+ */
+ public boolean isRowNumber() {
+ return isRowNumber;
+ }
+
+ /**
+ * Set if it is a simple RANK operation.
+ * @param rowNumber if is a row number operation
+ */
+ public void setIsRowNumber(boolean rowNumber) {
+ this.isRowNumber = rowNumber;
+ }
+}