You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by gd...@apache.org on 2012/09/13 16:55:38 UTC

svn commit: r1384352 [1/4] - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...

Author: gdfm
Date: Thu Sep 13 14:55:36 2012
New Revision: 1384352

URL: http://svn.apache.org/viewvc?rev=1384352&view=rev
Log:
PIG-2353: RANK function like in SQL (xalan via azaroth)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java
    pig/trunk/test/org/apache/pig/test/TestOrderBy3.java
    pig/trunk/test/org/apache/pig/test/TestRank1.java
    pig/trunk/test/org/apache/pig/test/TestRank2.java
    pig/trunk/test/org/apache/pig/test/TestRank3.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
    pig/trunk/src/org/apache/pig/parser/AliasMasker.g
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryLexer.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
    pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm
    pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/e2e/pig/tools/generate/generate_data.pl
    pig/trunk/test/org/apache/pig/parser/TestLexer.pig
    pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
    pig/trunk/test/org/apache/pig/parser/TestParser.pig
    pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java
    pig/trunk/test/org/apache/pig/parser/TestQueryParser.java
    pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java
    pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 13 14:55:36 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2353: RANK function like in SQL (xalan via azaroth)
+
 PIG-2915: Builtin TOP udf is sensitive to null input bags (hazen via dvryaboy)
 
 PIG-2901: Errors and lacks in document "Pig Latin Basics" (miyakawataku via billgraham)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Sep 13 14:55:36 2012
@@ -635,4 +635,8 @@ public class AvroStorage extends FileInp
         }
     }
 
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException{
+
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Sep 13 14:55:36 2012
@@ -25,13 +25,17 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +46,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -62,11 +70,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
@@ -137,6 +147,11 @@ public class JobControlCompiler{
     private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
     private static final String REDUCER_ESTIMATOR_ARG_KEY =  "pig.exec.reducer.estimator.arg";
 
+    public static final String PIG_MAP_COUNTER = "pig.counters.counter_";
+    public static final String PIG_MAP_RANK_NAME = "pig.rank_";
+    public static final String PIG_MAP_SEPARATOR = "_";
+    public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new HashMap<String, ArrayList<Pair<String,Long>>>();
+
     /**
      * We will serialize the POStore(s) present in map and reduce in lists in
      * the Hadoop Conf. In the case of Multi stores, we could deduce these from
@@ -152,6 +167,7 @@ public class JobControlCompiler{
     private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
 
     private Map<Job, MapReduceOper> jobMroMap;
+    private int counterSize;
 
     public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
         this.pigContext = pigContext;
@@ -314,6 +330,8 @@ public class JobControlCompiler{
             if (!completeFailedJobs.contains(job))
             {
                 MapReduceOper mro = jobMroMap.get(job);
+                if (mro.isCounterOperation() /*&& completeFailedJobs.size() > 0*/)
+                    saveCounters(job,mro.getOperationID());
                 plan.remove(mro);
             }
         }
@@ -323,6 +341,64 @@ public class JobControlCompiler{
     }
 
     /**
+     * Reads the global counters produced by a job on the group labeled with PIG_MAP_RANK_NAME.
+     * Then, it is calculated the cumulative sum, which consists on the sum of previous cumulative
+     * sum plus the previous global counter value.
+     * @param job with the global counters collected.
+     * @param operationID After being collected on global counters (POCounter),
+     * these values are passed via configuration file to PORank, by using the unique
+     * operation identifier
+     */
+    private void saveCounters(Job job, String operationID) {
+        JobClient jobClient;
+        Counters counters;
+        Group groupCounters;
+
+        Long previousValue = 0L;
+        Long previousSum = 0L;
+        ArrayList<Pair<String,Long>> counterPairs;
+
+        try {
+            jobClient = job.getJobClient();
+            counters = jobClient.getJob(job.getAssignedJobID()).getCounters();
+            groupCounters = counters.getGroup(getGroupName(counters.getGroupNames()));
+
+            Iterator<Counter> it = groupCounters.iterator();
+            HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
+
+            while(it.hasNext()) {
+                try{
+                    Counter c = it.next();
+                    counterList.put(Integer.valueOf(c.getDisplayName()), c.getValue());
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                }
+            }
+            counterSize = counterList.size();
+            counterPairs = new ArrayList<Pair<String,Long>>();
+
+            for(int i = 0; i < counterSize; i++){
+                previousSum += previousValue;
+                previousValue = counterList.get(Integer.valueOf(i));
+                counterPairs.add(new Pair<String, Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + JobControlCompiler.PIG_MAP_SEPARATOR + i, previousSum));
+            }
+
+            globalCounters.put(operationID, counterPairs);
+
+        } catch (Exception e) {
+            String msg = "Error to read counters into Rank operation counterSize "+counterSize;
+            throw new RuntimeException(msg, e);
+        }
+    }
+
+    private String getGroupName(Collection<String> collection) {
+        for (String name : collection) {
+            if (name.contains(PIG_MAP_RANK_NAME))
+                return name;
+        }
+        return null;
+    }
+    /**
      * The method that creates the Job corresponding to a MapReduceOper.
      * The assumption is that
      * every MapReduceOper will have a load and a store. The JobConf removes
@@ -704,6 +780,28 @@ public class JobControlCompiler{
                 nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
             }
 
+            if (mro.isCounterOperation()) {
+                if (mro.isRowNumber()) {
+                    nwJob.setMapperClass(PigMapReduceCounter.PigMapCounter.class);
+                } else {
+                    nwJob.setReducerClass(PigMapReduceCounter.PigReduceCounter.class);
+                }
+            }
+
+            if(mro.isRankOperation()) {
+                Iterator<String> operationIDs = mro.getRankOperationId().iterator();
+
+                while(operationIDs.hasNext()) {
+                    String operationID = operationIDs.next();
+                    Iterator<Pair<String, Long>> itPairs = globalCounters.get(operationID).iterator();
+                    Pair<String,Long> pair = null;
+                    while(itPairs.hasNext()) {
+                        pair = itPairs.next();
+                        conf.setLong(pair.first, pair.second);
+                    }
+                }
+            }
+
             if (!pigContext.inIllustrator)
             {
                 // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Sep 13 14:55:36 2012
@@ -58,6 +58,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -75,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -1982,12 +1984,77 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
+    /**
+     * For the counter job, it depends if it is row number or not.
+     * In case of being a row number, any previous jobs are saved
+     * and POCounter is added as a leaf on a map task.
+     * If it is not, then POCounter is added as a leaf on a reduce
+     * task (last sorting phase).
+     **/
+    @Override
+    public void visitCounter(POCounter op) throws VisitorException {
+        try{
+            if(op.isRowNumber()) {
+                List<PhysicalOperator> mpLeaves = curMROp.mapPlan.getLeaves();
+                PhysicalOperator leaf = mpLeaves.get(0);
+                if ( !curMROp.isMapDone() && !curMROp.isRankOperation() )
+                {
+                    curMROp.setIsCounterOperation(true);
+                    curMROp.setIsRowNumber(true);
+                    curMROp.setOperationID(op.getOperationID());
+                    curMROp.mapPlan.addAsLeaf(op);
+                } else {
+                    FileSpec fSpec = getTempFileSpec();
+                    MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+                    MapReduceOper mrCounter = startNew(fSpec, prevMROper);
+                    mrCounter.mapPlan.addAsLeaf(op);
+                    mrCounter.setIsCounterOperation(true);
+                    mrCounter.setIsRowNumber(true);
+                    mrCounter.setOperationID(op.getOperationID());
+                    curMROp = mrCounter;
+                }
+            } else {
+                curMROp.setIsCounterOperation(true);
+                curMROp.setIsRowNumber(false);
+                curMROp.setOperationID(op.getOperationID());
+                curMROp.reducePlan.addAsLeaf(op);
+            }
+
+            phyToMROpMap.put(op, curMROp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    /**
+     * In case of PORank, it is closed any other previous job (containing
+     * POCounter as a leaf) and PORank is added on map phase.
+     **/
+    @Override
+    public void visitRank(PORank op) throws VisitorException {
+        try{
+            FileSpec fSpec = getTempFileSpec();
+            MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+
+            curMROp = startNew(fSpec, prevMROper);
+            curMROp.setOperationID(op.getOperationID());
+            curMROp.mapPlan.addAsLeaf(op);
+
+            phyToMROpMap.put(op, curMROp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
 
     private Pair<POProject,Byte> [] getSortCols(List<PhysicalPlan> plans) throws PlanException, ExecException {
         if(plans!=null){
             @SuppressWarnings("unchecked")
-            Pair<POProject,Byte>[] ret = new Pair[plans.size()]; 
+            Pair<POProject,Byte>[] ret = new Pair[plans.size()];
             int i=-1;
             for (PhysicalPlan plan : plans) {
                 PhysicalOperator op = plan.getLeaves().get(0);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Sep 13 14:55:36 2012
@@ -18,7 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.pig.impl.plan.OperatorKey;
@@ -26,6 +29,7 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.PlanException;
@@ -133,11 +137,19 @@ public class MapReduceOper extends Opera
 
     // POLimit can also have an expression. See PIG-1926
     PhysicalPlan limitPlan = null;
-    
-    // Indicates that this MROper is a splitter MROper. 
+
+    // Indicates that this MROper is a splitter MROper.
     // That is, this MROper ends due to a POSPlit operator.
     private boolean splitter = false;
 
+    // Indicates that there is a counter operation in the MR job.
+    private boolean isCounterOperation = false;
+
+    // Indicates that there is a rank operation without sorting (row number) in the MR job.
+    private boolean isRowNumber = false;
+
+    private String operationID;
+
 	// Set to true if it is skewed join
 	private boolean skewedJoin = false;
 
@@ -478,12 +490,53 @@ public class MapReduceOper extends Opera
     protected void useTypedComparator(boolean useTypedComparator) {
         this.usingTypedComparator = useTypedComparator;
     }
-    
+
     protected void noCombineSmallSplits() {
         combineSmallSplits = false;
     }
-    
+
     public boolean combineSmallSplits() {
         return combineSmallSplits;
     }
+
+    public void setIsCounterOperation(boolean counter) {
+        this.isCounterOperation = counter;
+    }
+
+    public boolean isCounterOperation() {
+        return isCounterOperation;
+    }
+
+    public boolean isRankOperation() {
+        return getRankOperationId().size() != 0;
+    }
+    
+    public ArrayList<String> getRankOperationId() {
+        ArrayList<String> operationIDs = new ArrayList<String>();
+        Iterator<PhysicalOperator> mapRoots = this.mapPlan.getRoots().iterator();
+
+        while(mapRoots.hasNext()) {
+            PhysicalOperator operation = mapRoots.next();
+            if(operation instanceof PORank)
+                operationIDs.add(((PORank) operation).getOperationID());
+        }
+
+        return operationIDs;
+    }
+
+    public void setIsRowNumber(boolean isRowNumber) {
+        this.isRowNumber = isRowNumber;
+    }
+
+    public boolean isRowNumber() {
+        return isRowNumber;
+    }
+
+    public void setOperationID(String operationID) {
+        this.operationID = operationID;
+    }
+
+    public String getOperationID() {
+        return operationID;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Sep 13 14:55:36 2012
@@ -44,23 +44,23 @@ public class PhyPlanSetter extends PhyPl
     public void visitLoad(POLoad ld) throws VisitorException{
         ld.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitNative(PONative nt) throws VisitorException{
         nt.setParentPlan(parent);
     }
- 
+
     @Override
     public void visitStore(POStore st) throws VisitorException{
         st.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitFilter(POFilter fl) throws VisitorException{
         super.visitFilter(fl);
         fl.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
         super.visitLocalRearrange(lr);
@@ -77,28 +77,28 @@ public class PhyPlanSetter extends PhyPl
     public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
         gr.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitPackage(POPackage pkg) throws VisitorException{
         pkg.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
         pkg.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         super.visitPOForEach(nfe);
         nfe.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitUnion(POUnion un) throws VisitorException{
         un.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitSplit(POSplit spl) throws VisitorException{
         PhysicalPlan oldPlan = parent;
@@ -112,7 +112,7 @@ public class PhyPlanSetter extends PhyPl
         parent=oldPlan;
         spl.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitDemux(PODemux demux) throws VisitorException{
         super.visitDemux(demux);
@@ -121,55 +121,60 @@ public class PhyPlanSetter extends PhyPl
 
     @Override
     public void visitDistinct(PODistinct distinct) throws VisitorException {
-        distinct.setParentPlan(parent);		
+        distinct.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitSort(POSort sort) throws VisitorException {
         super.visitSort(sort);
         sort.setParentPlan(parent);
     }
-    
+
+    @Override
+    public void visitRank(PORank rank) throws VisitorException {
+
+    }
+
     @Override
     public void visitConstant(ConstantExpression cnst) throws VisitorException{
         cnst.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitProject(POProject proj) throws VisitorException{
         proj.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
         grt.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitLessThan(LessThanExpr lt) throws VisitorException{
         lt.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
         gte.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
         lte.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitEqualTo(EqualToExpr eq) throws VisitorException{
         eq.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
         eq.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitRegexp(PORegexp re) throws VisitorException{
         re.setParentPlan(parent);
@@ -179,32 +184,32 @@ public class PhyPlanSetter extends PhyPl
     public void visitIsNull(POIsNull isNull) throws VisitorException {
         isNull.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitAdd(Add add) throws VisitorException{
         add.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitSubtract(Subtract sub) throws VisitorException {
         sub.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitMultiply(Multiply mul) throws VisitorException {
         mul.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitDivide(Divide dv) throws VisitorException {
         dv.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitMod(Mod mod) throws VisitorException {
         mod.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitAnd(POAnd and) throws VisitorException {
         and.setParentPlan(parent);
@@ -229,12 +234,12 @@ public class PhyPlanSetter extends PhyPl
     public void visitNegative(PONegative negative) {
         negative.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
         userFunc.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
         compFunc.setParentPlan(parent);
@@ -244,7 +249,7 @@ public class PhyPlanSetter extends PhyPl
     public void visitMapLookUp(POMapLookUp mapLookUp) {
         mapLookUp.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
         joinPackage.setParentPlan(parent);
@@ -254,17 +259,17 @@ public class PhyPlanSetter extends PhyPl
     public void visitCast(POCast cast) {
         cast.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitLimit(POLimit lim) throws VisitorException{
         lim.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitFRJoin(POFRJoin join) throws VisitorException {
         join.setParentPlan(parent);
     }
-    
+
     @Override
     public void visitMergeJoin(POMergeJoin join) throws VisitorException {
         join.setParentPlan(parent);
@@ -280,13 +285,13 @@ public class PhyPlanSetter extends PhyPl
         stream.setParentPlan(parent);
     }
 
-/*
+    /*
     @Override
     public void visitPartitionRearrange(POPartitionRearrange lrfi) throws VisitorException {
         super.visitPartitionRearrange(lrfi);
         lrfi.setParentPlan(parent);
     }
-*/
+     */
 
     @Override
     public void visitPartialAgg(POPartialAgg poPartialAgg) {

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly.Map;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+public class PigMapReduceCounter {
+
+    /**
+     * This class is the used only for simple RANK operation, namely row number mode.
+     **/
+    public static class PigMapCounter extends PigMapBase {
+
+        private static final Log log = LogFactory.getLog(PigMapCounter.class);
+        public static String taskID;
+        public static Context context;
+        private PhysicalOperator pOperator;
+
+        /**
+         * Here is set up the task id, in order to be attached to each tuple
+         **/
+        public void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+
+            taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+
+            pOperator = mp.getLeaves().get(0);
+
+            while(true) {
+                if(pOperator instanceof POCounter){
+                    ((POCounter) pOperator).setTaskId(taskID);
+                    ((POCounter) pOperator).resetLocalCounter();
+                    break;
+                } else {
+                    pOperator = mp.getPredecessors(pOperator).get(0);
+                }
+            }
+        }
+
+        /**
+         * While tuples are collected, they are counted one by one by a global counter per task.
+         **/
+        @Override
+        public void collect(Context context, Tuple tuple)
+        throws InterruptedException, IOException {
+            context.write(null, tuple);
+            try {
+                PigStatusReporter reporter = PigStatusReporter.getInstance();
+                if (reporter != null) {
+                    reporter.getCounter(
+                            JobControlCompiler.PIG_MAP_RANK_NAME
+                            + context.getJobID().toString(), taskID)
+                            .increment(1);
+                }
+            } catch (Exception ex) {
+                log.error("Error on incrementer of PigMapCounter");
+            }
+        }
+    }
+
+    /**
+     * This class is the used for RANK BY operations, independently if it is dense or not.
+     **/
+    public static class PigReduceCounter extends PigMapReduce.Reduce {
+
+        private static final Log log = LogFactory.getLog(PigReduceCounter.class);
+        public static String taskID;
+        public static Context context;
+        public static List<PhysicalOperator> leaves;
+        public static PhysicalOperator leaf;
+
+        /**
+         * Here is set up the task id, in order to be attached to each tuple
+         **/
+        @Override
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+
+            taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+
+            leaf = rp.getLeaves().get(0);
+
+            while(true) {
+                if(leaf instanceof POCounter){
+                    ((POCounter) leaf).setTaskId(taskID);
+                    ((POCounter) leaf).resetLocalCounter();
+                    break;
+                } else {
+                    leaf = rp.getPredecessors(leaf).get(0);
+                }
+            }
+
+            this.context = context;
+        }
+
+        /**
+         * On this case, global counters are accessed during reduce phase (immediately after a
+         * sorting phase) and the increment for global counters are dependent if it is dense rank
+         * or not.
+         * If it is a dense rank, increment is done by 1. if it is not increment depends on the size
+         * of the size of bag in the tuple.
+         * @param increment is the value to add to the corresponding global counter.
+         **/
+        public static void incrementCounter(Long increment) {
+            try {
+                PigStatusReporter reporter = PigStatusReporter.getInstance();
+                if (reporter != null) {
+
+                    if(leaf instanceof POCounter){
+                        reporter.getCounter(
+                                JobControlCompiler.PIG_MAP_RANK_NAME
+                                + context.getJobID().toString(), taskID).increment(increment);
+
+                    }
+
+                }
+            } catch (Exception ex) {
+                log.error("Error on incrementer of PigReduceCounter");
+            }
+
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java Thu Sep 13 14:55:36 2012
@@ -115,7 +115,13 @@ public class DotPOPrinter extends DotPla
             plans.addAll(((POForEach)op).getInputPlans());
         }
         else if(op instanceof POSort){
-            plans.addAll(((POSort)op).getSortPlans()); 
+            plans.addAll(((POSort)op).getSortPlans());
+        }
+        else if(op instanceof PORank){
+            plans.addAll(((PORank)op).getRankPlans());
+        }
+        else if(op instanceof POCounter){
+            plans.addAll(((POCounter)op).getCounterPlans());
         }
         else if(op instanceof POLocalRearrange){
             plans.addAll(((POLocalRearrange)op).getPlans());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Sep 13 14:55:36 2012
@@ -127,9 +127,17 @@ public class PhyPlanVisitor extends Plan
             popWalker();
         }
     }
-    
+
+    public void visitCounter(POCounter poCounter) throws VisitorException {
+        //do nothing
+    }
+
+    public void visitRank(PORank rank) throws VisitorException {
+        //do nothing
+    }
+
 	public void visitDistinct(PODistinct distinct) throws VisitorException {
-        //do nothing		
+        //do nothing
 	}
 
 	public void visitSort(POSort sort) throws VisitorException {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Sep 13 14:55:36 2012
@@ -163,8 +163,14 @@ public class PlanPrinter<O extends Opera
           else if(node instanceof POCollectedGroup){
             sb.append(planString(((POCollectedGroup)node).getPlans()));
           }
+          else if(node instanceof PORank){
+              sb.append(planString(((PORank)node).getRankPlans()));
+          }
+          else if(node instanceof POCounter){
+              sb.append(planString(((POCounter)node).getCounterPlans()));
+          }
           else if(node instanceof POSort){
-            sb.append(planString(((POSort)node).getSortPlans())); 
+            sb.append(planString(((POSort)node).getSortPlans()));
           }
           else if(node instanceof POForEach){
             sb.append(planString(((POForEach)node).getInputPlans()));

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduceCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+/**
+ * This operator is part of the RANK operator implementation.
+ * It adds a local counter and a unique task id to each tuple.
+ * There are 2 modes of operations: regular and dense.
+ * The local counter is depends on the mode of operation.
+ * With regular rank is considered duplicate rows while assigning
+ * numbers to distinct values groups.
+ * With dense rank counts the number of distinct values, without
+ * considering duplicate rows. Depending on if it is considered.
+ * the entire tuple (row number) or a by a set of columns (rank by).
+ *
+ * This Physical Operator relies on some specific MR class,
+ * available at PigMapReduceCounter.
+ **/
+
+public class POCounter extends PhysicalOperator {
+
+    private static final long serialVersionUID = 1L;
+    private static final Long ONE = 1L;
+
+    private List<PhysicalPlan> counterPlans;
+    private List<Boolean> mAscCols;
+
+    /**
+     * In case of RANK BY, it could by dense or not.
+     * Being a dense rank means to assign consecutive ranks
+     * to different values.
+     **/
+    private boolean isDenseRank = false;
+
+    /**
+     * In case of simple RANK, namely row number mode
+     * which is a consecutive number assigned to each tuple.
+     **/
+    private boolean isRowNumber = false;
+
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    /**
+     * Local counter for tuples on the same task.
+     **/
+    private Long localCount = 1L;
+
+    /**
+     * Task ID to label each tuple analyzed by the corresponding task
+     **/
+    private String taskID = "-1";
+
+    /**
+     * Unique identifier that links POCounter and PORank,
+     * through the global counter labeled with it.
+     **/
+    private String operationID;
+
+    public POCounter(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POCounter(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public POCounter(OperatorKey k, List<PhysicalOperator> inputs) {
+        this(k, -1, inputs);
+    }
+
+    public POCounter(OperatorKey k, int rp, List<PhysicalOperator> inputs) {
+        super(k, rp, inputs);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public POCounter(OperatorKey operatorKey, int requestedParallelism,
+            List inp, List<PhysicalPlan> counterPlans,
+            List<Boolean> ascendingCol) {
+        super(operatorKey, requestedParallelism, inp);
+        this.setCounterPlans(counterPlans);
+        this.setAscendingColumns(ascendingCol);
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null){
+            return new ExampleTuple((Tuple)out);
+        }
+        return (Tuple) out;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitCounter(this);
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result inp = null;
+
+        while (true) {
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP
+                    || inp.returnStatus == POStatus.STATUS_ERR)
+                break;
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            }
+
+            return addCounterValue(inp);
+        }
+        return inp;
+    }
+
+    /**
+     * Add current task id and local counter value.
+     * @param input from the previous output
+     * @return  a tuple within two values prepended to the tuple
+     * the task identifier and the local counter value.
+     * Local counter value could be incremented by one (is a row number or dense rank)
+     * or, could be incremented by the size of the bag on the previous tuple processed
+     **/
+    protected Result addCounterValue(Result input) throws ExecException {
+        Tuple in = (Tuple) input.result;
+        Tuple out = mTupleFactory.newTuple(in.getAll().size() + 2);
+        Long sizeBag = 0L;
+        int positionBag, i = 2;
+
+        // Tuples are added by two stamps before the tuple content:
+        // 1.- At position 0: Current taskId
+        out.set(0, getTaskId());
+
+        // 2.- At position 1: counter value
+        //On this case, each tuple is analyzed independently of the tuples grouped
+        if(isRowNumber() || isDenseRank()) {
+
+            //Only when is Dense Rank (attached to a reduce phase) it is incremented on this way
+            //Otherwise, the increment is done at mapper automatically
+            if(isDenseRank())
+                PigMapReduceCounter.PigReduceCounter.incrementCounter(POCounter.ONE);
+
+            out.set(1, getLocalCounter());
+
+            //and the local incrementer is sequentially increased.
+            incrementLocalCounter();
+
+        } else if(!isDenseRank()) {
+            //Standard rank: On this case is important the
+            //number of tuples on the same group.
+            positionBag = in.getAll().size()-1;
+            if (in.getType(positionBag) == DataType.BAG) {
+                sizeBag = ((org.apache.pig.data.DefaultAbstractBag)in.get(positionBag)).size();
+            }
+
+            //This value (the size of the tuples on the bag) is used to increment
+            //the current global counter and
+            PigMapReduceCounter.PigReduceCounter.incrementCounter(sizeBag);
+
+            out.set(1, getLocalCounter());
+
+            //the value for the next tuple on the current task
+            addToLocalCounter(sizeBag);
+
+        }
+
+        for (Object o : in) {
+            out.set(i++, o);
+        }
+
+        input.result = illustratorMarkup(in, out, 0);
+
+        return input;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "POCounter" + "["
+        + DataType.findTypeName(resultType) + "]" + " - "
+        + mKey.toString();
+    }
+
+    public void setCounterPlans(List<PhysicalPlan> counterPlans) {
+        this.counterPlans = counterPlans;
+    }
+
+    public List<PhysicalPlan> getCounterPlans() {
+        return counterPlans;
+    }
+
+    public void setAscendingColumns(List<Boolean> mAscCols) {
+        this.mAscCols = mAscCols;
+    }
+
+    public List<Boolean> getAscendingColumns() {
+        return mAscCols;
+    }
+
+    /**
+     *  Initialization step into the POCounter is to set
+     *  up local counter to 1.
+     **/
+    public void resetLocalCounter() {
+        this.localCount = 1L;
+    }
+
+    /**
+     *  Sequential counter used at ROW NUMBER and RANK BY DENSE mode
+     **/
+    public Long incrementLocalCounter() {
+        return localCount++;
+    }
+
+    public void setLocalCounter(Long localCount) {
+        this.localCount = localCount;
+    }
+
+    public Long getLocalCounter() {
+        return this.localCount;
+    }
+
+    public void addToLocalCounter(Long sizeBag) {
+        this.localCount += sizeBag;
+    }
+
+    /**
+     *  Task ID: identifier of the task (map or reducer)
+     **/
+    public void setTaskId(String taskID) {
+        this.taskID = taskID;
+    }
+
+    public String getTaskId() {
+        return this.taskID;
+    }
+
+    /**
+     *  Dense Rank flag
+     **/
+    public void setIsDenseRank(boolean isDenseRank) {
+        this.isDenseRank = isDenseRank;
+    }
+
+    public boolean isDenseRank() {
+        return isDenseRank;
+    }
+
+    /**
+     *  Row number flag
+     **/
+    public void setIsRowNumber(boolean isRowNumber) {
+        this.isRowNumber = isRowNumber;
+    }
+
+    public boolean isRowNumber() {
+        return isRowNumber;
+    }
+
+    /**
+     *  Operation ID: identifier shared within the corresponding PORank
+     **/
+    public void setOperationID(String operationID) {
+        this.operationID = operationID;
+    }
+
+    public String getOperationID() {
+        return operationID;
+    }
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+/**
+ * This operator is part of the RANK operator implementation.
+ * Reads the output tuple from POCounter and the cumulative sum previously calculated.
+ * Here is read the task identifier in order to get the corresponding cumulative sum,
+ * and the local counter at the tuple. These values are summed and prepended to the tuple.
+ **/
+
+public class PORank extends PhysicalOperator {
+
+    private static final Log log = LogFactory.getLog(PORank.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private List<PhysicalPlan> rankPlans;
+    private List<Boolean> mAscCols;
+    private List<Byte> ExprOutputTypes;
+
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    /**
+     * Unique identifier that links POCounter and PORank,
+     * through the global counter labeled with it.
+     **/
+    private String operationID;
+
+    /**
+     * Counter used to set tuples into the equivalence
+     * classes.
+     **/
+    private int localCountIllustrator = 0;
+
+    public PORank(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public PORank(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public PORank(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    public PORank(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public PORank(OperatorKey operatorKey, int requestedParallelism, List inp,
+            List<PhysicalPlan> rankPlans, List<Boolean> ascendingCol) {
+        super(operatorKey, requestedParallelism, inp);
+        this.setRankPlans(rankPlans);
+        this.setAscendingCols(ascendingCol);
+
+        ExprOutputTypes = new ArrayList<Byte>(rankPlans.size());
+
+        for (PhysicalPlan plan : rankPlans) {
+            ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
+        }
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null){
+            ExampleTuple tOut = new ExampleTuple((Tuple)out);
+            illustrator.addData((Tuple)out);
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple)in);
+
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            return tOut;
+        }
+        return (Tuple) out;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitRank(this);
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result inp = null;
+
+        while (true) {
+            inp = processInput();
+
+            if (inp.returnStatus == POStatus.STATUS_EOP
+                    || inp.returnStatus == POStatus.STATUS_ERR)
+                break;
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            }
+            return addRank(inp);
+        }
+
+        return inp;
+    }
+
+    /**
+     * Reads the output tuple from POCounter and the cumulative sum previously calculated.
+     * Here is read the task identifier in order to get the corresponding cumulative sum,
+     * and the local counter at the tuple. These values are summed and prepended to the tuple.
+     * @param input processed by POCounter
+     * @result tuple with the prepend rank value.
+     **/
+    public Result addRank(Result input) throws ExecException {
+        int i = 1;
+        Tuple in = (Tuple) input.result;
+        Tuple out = mTupleFactory.newTuple(in.getAll().size() - 1);
+
+        Long taskId = Long.valueOf(in.get(0).toString());
+        Long localCounter = (Long) in.get(1);
+
+        String nameCounter = JobControlCompiler.PIG_MAP_COUNTER + getOperationID() + JobControlCompiler.PIG_MAP_SEPARATOR + String.valueOf(taskId);
+
+        Long rank = PigMapReduce.sJobConfInternal.get().getLong( nameCounter , -1L );
+        
+        if(rank == -1) {
+            log.error("Error on reading counter "+ nameCounter);
+            throw new RuntimeException("Unable to read counter "+ nameCounter);
+        }
+
+        out.set(0, rank + localCounter);
+
+        //Add the content of the tuple
+        List<Object> sub = in.getAll().subList(2, in.getAll().size());
+
+        for (Object o : sub)
+            out.set(i++, o);
+
+        if(localCountIllustrator > 2)
+            localCountIllustrator = 0;
+
+        input.result = illustratorMarkup(in, out, localCountIllustrator);
+
+        localCountIllustrator++;
+
+        return input;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "PORank" + "["
+        + DataType.findTypeName(resultType) + "]" + " - "
+        + mKey.toString();
+    }
+
+    public void setRankPlans(List<PhysicalPlan> rankPlans) {
+        this.rankPlans = rankPlans;
+    }
+
+    public List<PhysicalPlan> getRankPlans() {
+        return rankPlans;
+    }
+
+    public void setAscendingCols(List<Boolean> mAscCols) {
+        this.mAscCols = mAscCols;
+    }
+
+    public List<Boolean> getAscendingCols() {
+        return mAscCols;
+    }
+
+    /**
+     * Operation ID: identifier shared within the corresponding POCounter
+     * @param operationID
+     **/
+    public void setOperationID(String operationID) {
+        this.operationID = operationID;
+    }
+
+    public String getOperationID() {
+        return operationID;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java Thu Sep 13 14:55:36 2012
@@ -34,6 +34,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOInnerLoad;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
@@ -147,7 +148,13 @@ public abstract class AllExpressionVisit
         LogicalExpressionVisitor v = getVisitor(splitOutput.getFilterPlan());
         v.visit();
     }
-    
+
+    @Override
+    public void visit(LORank rank) throws FrontendException{
+        currentOp = rank;
+        visitAll(rank.getRankColPlans());
+    }
+
     @Override
     public void visit(LOSort sort) throws FrontendException {
         currentOp = sort;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java Thu Sep 13 14:55:36 2012
@@ -28,6 +28,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -52,13 +53,13 @@ public abstract class AllSameRalationalN
     public AllSameRalationalNodesVisitor(OperatorPlan plan, PlanWalker walker) throws FrontendException {
         super(plan, walker);
     }
-    
+
     /**
      * Method to call on every node in the logical plan.
      * @param op Node that is currently being visited.
      */
     abstract protected void execute(LogicalRelationalOperator op) throws FrontendException;
-    
+
     @Override
     public void visit(LOFilter filter) throws FrontendException {
         execute(filter);
@@ -78,47 +79,52 @@ public abstract class AllSameRalationalN
     public void visit(LOLoad load) throws FrontendException {
         execute(load);
     }
-    
+
     @Override
     public void visit(LOStore store) throws FrontendException {
         execute(store);
     }
-    
+
     @Override
     public void visit(LOForEach foreach) throws FrontendException {
         execute(foreach);
     }
-    
+
     @Override
     public void visit(LOSplit split) throws FrontendException {
         execute(split);
     }
-    
+
     @Override
     public void visit(LOSplitOutput splitOutput) throws FrontendException {
         execute(splitOutput);
     }
-    
+
     @Override
     public void visit(LOUnion union) throws FrontendException {
         execute(union);
     }
-    
+
     @Override
     public void visit(LOSort sort) throws FrontendException {
         execute(sort);
     }
-    
+
+    @Override
+    public void visit(LORank rank) throws FrontendException {
+        execute(rank);
+    }
+
     @Override
     public void visit(LODistinct distinct) throws FrontendException {
         execute(distinct);
     }
-    
+
     @Override
     public void visit(LOCross cross) throws FrontendException {
         execute(cross);
     }
-    
+
     @Override
     public void visit(LOStream stream) throws FrontendException {
         execute(stream);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Thu Sep 13 14:55:36 2012
@@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -190,6 +191,11 @@ public class LogicalPlanPrinter extends 
                 }
             }
         }
+        else if(node instanceof LORank){
+            // Visit fields for rank
+            for (OperatorPlan plan : ((LORank)node).getRankColPlans())
+                sb.append(planString(plan));
+        }
         else if(node instanceof LOSort){
             for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
                 sb.append(planString(plan));

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Thu Sep 13 14:55:36 2012
@@ -44,6 +44,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -169,13 +170,20 @@ public class SchemaResetter extends Logi
         visitAll(loSort.getSortColPlans());
         validate(loSort.getSchema());
     }
-    
+
+    @Override
+    public void visit(LORank loRank) throws FrontendException{
+        loRank.resetSchema();
+        visitAll(loRank.getRankColPlans());
+        validate(loRank.getSchema());
+    }
+
     @Override
     public void visit(LODistinct loDistinct) throws FrontendException {
         loDistinct.resetSchema();
         validate(loDistinct.getSchema());
     }
-    
+
     @Override
     public void visit(LOLimit loLimit) throws FrontendException {
         loLimit.resetSchema();

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java Thu Sep 13 14:55:36 2012
@@ -40,6 +40,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -143,12 +144,22 @@ public class UidResetter extends Logical
             uidResetter.visit();
         }
     }
-    
+
+    @Override
+    public void visit(LORank loRank) throws FrontendException {
+        loRank.resetUid();
+        List<LogicalExpressionPlan> rankPlans = loRank.getRankColPlans();
+        for (LogicalExpressionPlan rankPlan : rankPlans) {
+            ExpressionUidResetter uidResetter = new ExpressionUidResetter(rankPlan);
+            uidResetter.visit();
+        }
+    }
+
     @Override
     public void visit(LODistinct loDistinct) throws FrontendException {
         loDistinct.resetUid();
     }
-    
+
     @Override
     public void visit(LOLimit loLimit) throws FrontendException {
         loLimit.resetUid();

Added: pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java?rev=1384352&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LORank.java Thu Sep 13 14:55:36 2012
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+
+/**
+ * RANK operator implementation.
+ * Operator Syntax:
+ * <pre>
+ * {@code alias = RANK rel ( BY (col_ref) (ASC|DESC)? ( DENSE )? )?;}
+ * alias - output alias
+ * RANK - operator
+ * rel - input relation
+ * BY - operator
+ * col_ref - STAR or Column References or a range in the schema of rel
+ * DENSE - dense rank means a sequential value without gasp among different tuple values.
+ * </pre>
+ */
+
+public class LORank extends LogicalRelationalOperator{
+
+    private final static String RANK_COL_NAME = "rank";
+    private final static String SEPARATOR = "_";
+
+    /**
+     * A List within logical expression plans in case of RANK BY
+     */
+    private List<LogicalExpressionPlan> rankColPlans;
+
+    /**
+     * A List within ascending columns on a RANK BY
+     */
+    private List<Boolean> ascCols;
+
+    /**
+     * In case of RANK BY, it could by dense or not.
+     * Being a dense rank means to assign consecutive ranking
+     * to different tuples.
+     */
+    private boolean isDenseRank = false;
+
+    /**
+     * In case of simple RANK, namely row number mode
+     * which is a consecutive number assigned to each tuple.
+     */
+    private boolean isRowNumber = false;
+
+    public LORank( OperatorPlan plan) {
+        super("LORank", plan);
+    }
+
+    public LORank( OperatorPlan plan, List<LogicalExpressionPlan> rankColPlans, List<Boolean> ascCols) {
+        this( plan );
+        this.rankColPlans = rankColPlans;
+        this.ascCols = ascCols;
+    }
+
+    public List<LogicalExpressionPlan> getRankColPlans() {
+        return rankColPlans;
+    }
+
+    public void setRankColPlan(List<LogicalExpressionPlan> rankColPlans) {
+        this.rankColPlans = rankColPlans;
+    }
+
+    public List<Boolean> getAscendingCol() {
+        return ascCols;
+    }
+
+    public void setAscendingCol(List<Boolean> ascCols) {
+        this.ascCols = ascCols;
+    }
+
+    /**
+     * Get the schema for the output of LORank.
+     * Composed by long value prepended to the
+     * rest of the input schema
+     * @return the schema
+     * @throws FrontendException
+     */
+    @Override
+    public LogicalSchema getSchema() throws FrontendException {
+
+        // if schema is calculated before, just return
+        if (schema != null) {
+            return schema;
+        }
+
+        LogicalRelationalOperator input = null;
+
+        //Same schema of previous predecessor
+        input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        if (input == null) {
+            return null;
+        }
+
+        LogicalSchema inputSchema = input.getSchema();
+
+        // the schema of one input is unknown, so the rank schema is unknown, just return
+        if (inputSchema == null) {
+            schema = null;
+            return schema;
+        }
+
+        //Complete copy from previous schema for each LogicalFieldSchema
+        List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+
+        for (int i=0; i<inputSchema.size(); i++) {
+            LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
+            LogicalSchema.LogicalFieldSchema newFS = null;
+            newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
+            fss.add(newFS);
+        }
+
+        schema = new LogicalSchema();
+
+        schema.addField(new LogicalSchema.LogicalFieldSchema(RANK_COL_NAME+SEPARATOR+input.getAlias(), null, DataType.LONG));
+        schema.getField(0).uid = LogicalExpression.getNextUid();
+
+        for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
+            schema.addField(fieldSchema);
+        }
+
+        return schema;
+
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws FrontendException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new FrontendException("Expected LogicalPlanVisitor", 2223);
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) throws FrontendException {
+        if (other != null && other instanceof LORank) {
+            LORank oR = (LORank)other;
+            if (!rankColPlans.equals(oR.rankColPlans))
+                return false;
+        } else {
+            return false;
+        }
+
+        return checkEquality((LogicalRelationalOperator)other);
+    }
+
+    /**
+     * Get if it is a dense RANK BY
+     * @return boolean
+     */
+    public boolean isDenseRank() {
+        return isDenseRank;
+    }
+
+    /**
+     * Set if it is a dense RANK BY
+     * @param isDenseRank if is dense rank or not
+     */
+    public void setIsDenseRank(boolean isDenseRank) {
+        this.isDenseRank = isDenseRank;
+    }
+
+    /**
+     * Get if it is a simple RANK operation.
+     * Which means a row number attached to each tuple.
+     * @return boolean
+     */
+    public boolean isRowNumber() {
+        return isRowNumber;
+    }
+
+    /**
+     * Set if it is a simple RANK operation.
+     * @param rowNumber if is a row number operation
+     */
+    public void setIsRowNumber(boolean rowNumber) {
+        this.isRowNumber = rowNumber;
+    }
+}