You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/03/27 19:59:42 UTC

svn commit: r1582443 - in /pig/branches/tez: ivy/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/t...

Author: rohini
Date: Thu Mar 27 18:59:41 2014
New Revision: 1582443

URL: http://svn.apache.org/r1582443
Log:
PIG-3814: Implement RANK in Tez (rohini)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
Modified:
    pig/branches/tez/ivy/libraries.properties
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
    pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm
    pig/branches/tez/test/e2e/pig/tests/nightly.conf
    pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Thu Mar 27 18:59:41 2014
@@ -40,9 +40,9 @@ guava.version=11.0
 jersey-core.version=1.8
 hadoop-core.version=1.0.4
 hadoop-test.version=1.0.4
-hadoop-common.version=2.2.0
-hadoop-hdfs.version=2.2.0
-hadoop-mapreduce.version=2.2.0
+hadoop-common.version=2.3.0
+hadoop-hdfs.version=2.3.0
+hadoop-mapreduce.version=2.3.0
 hbase94.version=0.94.1
 hbase95.version=0.96.0-hadoop1
 hsqldb.version=1.8.0.10

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Thu Mar 27 18:59:41 2014
@@ -18,15 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly.Map;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.data.Tuple;
@@ -47,16 +42,18 @@ public class PigMapReduceCounter {
         /**
          * Here is set up the task id, in order to be attached to each tuple
          **/
+        @Override
         public void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
 
-            taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+            int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+            taskID = String.valueOf(taskIDInt);
 
             pOperator = mp.getLeaves().get(0);
 
             while(true) {
                 if(pOperator instanceof POCounter){
-                    ((POCounter) pOperator).setTaskId(taskID);
+                    ((POCounter) pOperator).setTaskId(taskIDInt);
                     ((POCounter) pOperator).resetLocalCounter();
                     break;
                 } else {
@@ -104,13 +101,14 @@ public class PigMapReduceCounter {
         protected void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
 
-            taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+            int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+            taskID = String.valueOf(taskIDInt);
 
             leaf = rp.getLeaves().get(0);
 
             while(true) {
                 if(leaf instanceof POCounter){
-                    ((POCounter) leaf).setTaskId(taskID);
+                    ((POCounter) leaf).setTaskId(taskIDInt);
                     ((POCounter) leaf).resetLocalCounter();
                     break;
                 } else {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java Thu Mar 27 18:59:41 2014
@@ -32,7 +32,6 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * This operator is part of the RANK operator implementation.
@@ -80,7 +79,7 @@ public class POCounter extends PhysicalO
     /**
      * Task ID to label each tuple analyzed by the corresponding task
      **/
-    private String taskID = "-1";
+    private Integer taskID = -1;
 
     /**
      * Unique identifier that links POCounter and PORank,
@@ -104,6 +103,15 @@ public class POCounter extends PhysicalO
         super(k, rp, inputs);
     }
 
+    public POCounter(POCounter copy) {
+        super(copy);
+        this.counterPlans = copy.counterPlans;
+        this.mAscCols = copy.mAscCols;
+        this.isDenseRank = copy.isDenseRank;
+        this.isRowNumber = copy.isRowNumber;
+        this.operationID = copy.operationID;
+    }
+
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public POCounter(OperatorKey operatorKey, int requestedParallelism,
             List inp, List<PhysicalPlan> counterPlans,
@@ -156,22 +164,21 @@ public class POCounter extends PhysicalO
         Tuple in = (Tuple) input.result;
         Tuple out = mTupleFactory.newTuple(in.getAll().size() + 2);
         Long sizeBag = 0L;
-        int positionBag, i = 2;
+        int positionBag, i = 1;
 
         // Tuples are added by two stamps before the tuple content:
-        // 1.- At position 0: Current taskId
-        out.set(0, getTaskId());
+        // 1.- At position 0: counter value
+        // 2.- At position last: Current taskId
 
-        // 2.- At position 1: counter value
         //On this case, each tuple is analyzed independently of the tuples grouped
         if(isRowNumber() || isDenseRank()) {
 
             //Only when is Dense Rank (attached to a reduce phase) it is incremented on this way
             //Otherwise, the increment is done at mapper automatically
             if(isDenseRank())
-                PigMapReduceCounter.PigReduceCounter.incrementCounter(POCounter.ONE);
+                incrementReduceCounter(POCounter.ONE);
 
-            out.set(1, getLocalCounter());
+            out.set(0, getLocalCounter());
 
             //and the local incrementer is sequentially increased.
             incrementLocalCounter();
@@ -186,9 +193,9 @@ public class POCounter extends PhysicalO
 
             //This value (the size of the tuples on the bag) is used to increment
             //the current global counter and
-            PigMapReduceCounter.PigReduceCounter.incrementCounter(sizeBag);
+            incrementReduceCounter(sizeBag);
 
-            out.set(1, getLocalCounter());
+            out.set(0, getLocalCounter());
 
             //the value for the next tuple on the current task
             addToLocalCounter(sizeBag);
@@ -199,11 +206,18 @@ public class POCounter extends PhysicalO
             out.set(i++, o);
         }
 
+        // At position last: Current taskId
+        out.set(i++, getTaskId());
+
         input.result = illustratorMarkup(in, out, 0);
 
         return input;
     }
 
+    protected void incrementReduceCounter(Long increment) {
+        PigMapReduceCounter.PigReduceCounter.incrementCounter(increment);
+    }
+
     @Override
     public boolean supportsMultipleInputs() {
         return false;
@@ -248,7 +262,7 @@ public class POCounter extends PhysicalO
     /**
      *  Sequential counter used at ROW NUMBER and RANK BY DENSE mode
      **/
-    public Long incrementLocalCounter() {
+    protected Long incrementLocalCounter() {
         return localCount++;
     }
 
@@ -260,18 +274,18 @@ public class POCounter extends PhysicalO
         return this.localCount;
     }
 
-    public void addToLocalCounter(Long sizeBag) {
+    protected void addToLocalCounter(Long sizeBag) {
         this.localCount += sizeBag;
     }
 
     /**
      *  Task ID: identifier of the task (map or reducer)
      **/
-    public void setTaskId(String taskID) {
+    public void setTaskId(int taskID) {
         this.taskID = taskID;
     }
 
-    public String getTaskId() {
+    public int getTaskId() {
         return this.taskID;
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java Thu Mar 27 18:59:41 2014
@@ -85,6 +85,13 @@ public class PORank extends PhysicalOper
         super(k, rp, inp);
     }
 
+    public PORank(PORank copy) {
+        super(copy);
+        this.rankPlans = copy.rankPlans;
+        this.mAscCols = copy.mAscCols;
+        this.ExprOutputTypes = copy.ExprOutputTypes;
+    }
+
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public PORank(OperatorKey operatorKey, int requestedParallelism, List inp,
             List<PhysicalPlan> rankPlans, List<Boolean> ascendingCol) {
@@ -142,20 +149,32 @@ public class PORank extends PhysicalOper
      * Here is read the task identifier in order to get the corresponding cumulative sum,
      * and the local counter at the tuple. These values are summed and prepended to the tuple.
      * @param input processed by POCounter
-     * @return input as Result. The input.result tuple owns the prepend rank value 
+     * @return input as Result. The input.result tuple owns the prepend rank value
      **/
     public Result addRank(Result input) throws ExecException {
-        int i = 1;
         Tuple in = (Tuple) input.result;
-        Tuple out = mTupleFactory.newTuple(in.getAll().size() - 1);
 
-        Long taskId = Long.valueOf(in.get(0).toString());
-        Long localCounter = (Long) in.get(1);
+        Long localCounter = (Long) in.get(0);
+        Integer taskId = (Integer) in.getAll().remove(in.getAll().size() - 1);
 
-        String nameCounter = JobControlCompiler.PIG_MAP_COUNTER + getOperationID() + JobControlCompiler.PIG_MAP_SEPARATOR + String.valueOf(taskId);
+        Long rank = getRankCounterOffset(taskId);
+
+        in.set(0, rank + localCounter);
+
+        if(localCountIllustrator > 2)
+            localCountIllustrator = 0;
+
+        input.result = illustratorMarkup(in, in, localCountIllustrator);
+
+        localCountIllustrator++;
+
+        return input;
+    }
 
+    protected Long getRankCounterOffset(Integer taskId) {
+        String nameCounter = JobControlCompiler.PIG_MAP_COUNTER + getOperationID() + JobControlCompiler.PIG_MAP_SEPARATOR + String.valueOf(taskId);
         Long rank = PigMapReduce.sJobConfInternal.get().getLong( nameCounter , -1L );
-        
+
         if(illustrator != null) {
             rank = 0L;
         }
@@ -164,23 +183,7 @@ public class PORank extends PhysicalOper
             log.error("Error on reading counter "+ nameCounter);
             throw new RuntimeException("Unable to read counter "+ nameCounter);
         }
-
-        out.set(0, rank + localCounter);
-
-        //Add the content of the tuple
-        List<Object> sub = in.getAll().subList(2, in.getAll().size());
-
-        for (Object o : sub)
-            out.set(i++, o);
-
-        if(localCountIllustrator > 2)
-            localCountIllustrator = 0;
-
-        input.result = illustratorMarkup(in, out, localCountIllustrator);
-
-        localCountIllustrator++;
-
-        return input;
+        return rank;
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Thu Mar 27 18:59:41 2014
@@ -52,7 +52,7 @@ public class POValueOutputTez extends Ph
     // value only input output
     protected transient List<KeyValueWriter> writers;
 
-    private static EmptyWritable EMPTY_KEY = new EmptyWritable();
+    public static EmptyWritable EMPTY_KEY = new EmptyWritable();
 
     public POValueOutputTez(OperatorKey k) {
         super(k);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Thu Mar 27 18:59:41 2014
@@ -100,6 +100,11 @@ public class PigProcessor implements Log
         // Set the job conf as a thread-local member of PigMapReduce
         // for backwards compatibility with the existing code base.
         PigMapReduce.sJobConfInternal.set(conf);
+
+        LinkedList<TezTaskConfigurable> tezTCs = PlanHelper.getPhysicalOperators(execPlan, TezTaskConfigurable.class);
+        for (TezTaskConfigurable tezTC : tezTCs){
+            tezTC.initialize(processorContext);
+        }
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Mar 27 18:59:41 2014
@@ -31,6 +31,9 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
@@ -77,7 +80,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTezFactory.LocalRearrangeType;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterStatsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.PORankTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -565,9 +572,16 @@ public class TezCompiler extends PhyPlan
 
     @Override
     public void visitCounter(POCounter op) throws VisitorException {
-        int errCode = 2034;
-        String msg = "Cannot compile " + op.getClass().getSimpleName();
-        throw new TezCompilerException(msg, errCode, PigException.BUG);
+        // Refer visitRank(PORank) for more details
+        try{
+            POCounterTez counterTez = new POCounterTez(op);
+            nonBlocking(counterTez);
+            phyToTezOpMap.put(op, curTezOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+        }
     }
 
     @Override
@@ -1097,9 +1111,71 @@ public class TezCompiler extends PhyPlan
 
     @Override
     public void visitRank(PORank op) throws VisitorException {
-        int errCode = 2034;
-        String msg = "Cannot compile " + op.getClass().getSimpleName();
-        throw new TezCompilerException(msg, errCode, PigException.BUG);
+        try{
+            // Rank implementation has 3 vertices
+            // Vertex 1 has POCounterTez produce output tuples and send to Vertex 3 via 1-1 edge.
+            // Vertex 1 also sends the count of tuples of each task in Vertex 1 to Vertex 2 which is a single reducer.
+            // Vertex 3 has PORankTez which consumes from Vertex 2 as broadcast input and also tuples from Vertex 1 and
+            // produces tuples with updated ranks based on the count of tuples from Vertex 2.
+            // This is different from MR implementation where POCounter updates job counters, and that is
+            // copied by JobControlCompiler into the PORank job's jobconf.
+
+            // Previous operator is always POCounterTez (Vertex 1)
+            TezOperator counterOper = curTezOp;
+            POCounterTez counterTez = (POCounterTez) counterOper.plan.getLeaves().get(0);
+
+            //Construct Vertex 2
+            TezOperator statsOper = getTezOp();
+            tezPlan.add(statsOper);
+            POCounterStatsTez counterStatsTez = new POCounterStatsTez(OperatorKey.genOpKey(scope));
+            statsOper.plan.addAsLeaf(counterStatsTez);
+            statsOper.setRequestedParallelism(1);
+
+            //Construct Vertex 3
+            TezOperator rankOper = getTezOp();
+            tezPlan.add(rankOper);
+            PORankTez rankTez = new PORankTez(op);
+            rankOper.plan.addAsLeaf(rankTez);
+            curTezOp = rankOper;
+
+            // Connect counterOper vertex to rankOper vertex by 1-1 edge
+            rankOper.setRequestedParallelismByReference(counterOper);
+            TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, counterOper, rankOper);
+            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
+            edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+            counterTez.setTuplesOutputKey(rankOper.getOperatorKey().toString());
+            rankTez.setTuplesInputKey(counterOper.getOperatorKey().toString());
+
+            // Connect counterOper vertex to statsOper vertex by Shuffle edge
+            edge = TezCompilerUtil.connect(tezPlan, counterOper, statsOper);
+            // Task id
+            edge.setIntermediateOutputKeyClass(IntWritable.class.getName());
+            edge.partitionerClass = HashPartitioner.class;
+            // Number of records in that task
+            edge.setIntermediateOutputValueClass(LongWritable.class.getName());
+            counterTez.setStatsOutputKey(statsOper.getOperatorKey().toString());
+            counterStatsTez.setInputKey(counterOper.getOperatorKey().toString());
+
+            // Connect statsOper vertex to rankOper vertex by Broadcast edge
+            edge = TezCompilerUtil.connect(tezPlan, statsOper, rankOper);
+            edge.dataMovementType = DataMovementType.BROADCAST;
+            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
+            // Map of task id, offset count based on total number of records
+            edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+            counterStatsTez.setOutputKey(rankOper.getOperatorKey().toString());
+            rankTez.setStatsInputKey(statsOper.getOperatorKey().toString());
+
+            phyToTezOpMap.put(op, rankOper);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+        }
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Mar 27 18:59:41 2014
@@ -249,6 +249,7 @@ public class TezDagBuilder extends TezOp
             }
         }
 
+        //TODO: Remove this and set the classes on edge in TezCompiler
         List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
                 POValueOutputTez.class);
         if (!valueOutputs.isEmpty()) {
@@ -267,6 +268,23 @@ public class TezDagBuilder extends TezOp
             }
         }
 
+        conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
+                MRPartitioner.class.getName());
+
+        if (edge.getIntermediateOutputKeyClass() != null) {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+                    edge.getIntermediateOutputKeyClass());
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                    edge.getIntermediateOutputKeyClass());
+        }
+
+        if (edge.getIntermediateOutputValueClass() != null) {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+                    edge.getIntermediateOutputValueClass());
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+                    edge.getIntermediateOutputValueClass());
+        }
+
         conf.setBoolean("mapred.mapper.new-api", true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java Thu Mar 27 18:59:41 2014
@@ -47,6 +47,9 @@ public class TezEdgeDescriptor {
     // Sort order for secondary keys;
     private boolean[] secondarySortOrder;
 
+    private String intermediateOutputKeyClass;
+    private String intermediateOutputValueClass;
+
     public TezEdgeDescriptor() {
         combinePlan = new PhysicalPlan();
 
@@ -79,4 +82,20 @@ public class TezEdgeDescriptor {
         }
     }
 
+    public String getIntermediateOutputKeyClass() {
+        return intermediateOutputKeyClass;
+    }
+
+    public void setIntermediateOutputKeyClass(String intermediateOutputKeyClass) {
+        this.intermediateOutputKeyClass = intermediateOutputKeyClass;
+    }
+
+    public String getIntermediateOutputValueClass() {
+        return intermediateOutputValueClass;
+    }
+
+    public void setIntermediateOutputValueClass(String intermediateOutputValueClass) {
+        this.intermediateOutputValueClass = intermediateOutputValueClass;
+    }
+
 }

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+/**
+ * This interface is implemented by PhysicalOperators that can need to access
+ * TezProcessorContext of a Tez task.
+ */
+
+public interface TezTaskConfigurable {
+
+    public void initialize(TezProcessorContext processorContext) throws ExecException;
+
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * POCounterStatsTez is used to group counters from previous vertex POCounterTez tasks
+ */
+public class POCounterStatsTez extends PhysicalOperator implements TezLoad, TezOutput {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POCounterStatsTez.class);
+    private String inputKey;
+    private String outputKey;
+    // TODO: Even though we expect only one record from POCounter, because of Shuffle we have
+    // KeyValuesReader. After TEZ-661, switch to unsorted shuffle
+    private transient KeyValuesReader reader;
+    private transient KeyValueWriter writer;
+
+    public POCounterStatsTez(OperatorKey k) {
+        super(k);
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf)
+            throws ExecException {
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            reader = (KeyValuesReader) input.getReader();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        LogicalOutput output = outputs.get(outputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + outputKey + " is missing");
+        }
+        try {
+            writer = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        try {
+            Map<Integer, Long> counterRecords = new HashMap<Integer, Long>();
+            Integer key = null;
+            Long value = null;
+            // Read count of records per task
+            while (reader.next()) {
+                key = ((IntWritable)reader.getCurrentKey()).get();
+                for (Object val : reader.getCurrentValues()) {
+                    value = ((LongWritable)val).get();
+                    counterRecords.put(key, value);
+                }
+            }
+
+            // BinInterSedes only takes String for map key
+            Map<String, Long> counterOffsets = new HashMap<String, Long>();
+            // Create a map to contain task ids and beginning offset of record count
+            // based on total record count of all tasks
+            // For eg: If Task 0 has 5 records, Task 1 has 10 records and Task 2 has 3 records
+            // map will contain {0=0, 1=5, 2=15}
+            Long prevTasksCount = counterRecords.get(0);
+            counterOffsets.put("0", 0L);
+            for (int i = 1; i < counterRecords.size(); i++) {
+                counterOffsets.put("" + i, prevTasksCount);
+                prevTasksCount += counterRecords.get(i);
+            }
+
+            Tuple tuple = TupleFactory.getInstance().newTuple(1);
+            tuple.set(0, counterOffsets);
+            writer.write(POValueOutputTez.EMPTY_KEY, tuple);
+            return RESULT_EOP;
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    public void setOutputKey(String outputKey) {
+        this.outputKey = outputKey;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "PORankStatsTez - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+    }
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezTaskConfigurable;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+public class POCounterTez extends POCounter implements TezOutput, TezTaskConfigurable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POCounterTez.class);
+
+    private String tuplesOutputKey;
+    private String statsOutputKey;
+
+    private transient KeyValueWriter tuplesWriter;
+    private transient KeyValueWriter statsWriter;
+    private transient long totalTaskRecords = 0;
+
+    public POCounterTez(POCounter copy) {
+        super(copy);
+    }
+
+    public void setTuplesOutputKey(String tuplesOutputKey) {
+        this.tuplesOutputKey = tuplesOutputKey;
+    }
+
+    public void setStatsOutputKey(String statsOutputKey) {
+        this.statsOutputKey = statsOutputKey;
+    }
+
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+            throws ExecException {
+        this.setTaskId(processorContext.getTaskIndex());
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        LogicalOutput output = outputs.get(tuplesOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + tuplesOutputKey + " is missing");
+        }
+        try {
+            tuplesWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + tuplesOutputKey + " : output=" + output + ", writer=" + tuplesWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+
+        output = outputs.get(statsOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + statsOutputKey + " is missing");
+        }
+        try {
+            statsWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + statsOutputKey + " : output=" + output + ", writer=" + statsWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        Result inp = null;
+        try {
+            while (true) {
+                inp = processInput();
+                if (inp.returnStatus == POStatus.STATUS_EOP
+                        || inp.returnStatus == POStatus.STATUS_ERR)
+                    break;
+                if (inp.returnStatus == POStatus.STATUS_NULL) {
+                    continue;
+                }
+
+                tuplesWriter.write(POValueOutputTez.EMPTY_KEY,
+                        addCounterValue(inp).result);
+            }
+
+            statsWriter.write(new IntWritable(this.getTaskId()), new LongWritable(totalTaskRecords));
+
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+        return RESULT_EOP;
+    }
+
+    @Override
+    protected Long incrementLocalCounter() {
+        totalTaskRecords++;
+        return super.incrementLocalCounter();
+    }
+
+    @Override
+    protected void addToLocalCounter(Long sizeBag) {
+        super.addToLocalCounter(sizeBag);
+        totalTaskRecords += sizeBag;
+    }
+
+    @Override
+    protected void incrementReduceCounter(Long increment) {
+        totalTaskRecords += increment;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        super.visit(v);
+        v.visit(this);
+    }
+
+    @Override
+    public String name() {
+        return "POCounterTez - " + mKey.toString() + "\t->\t " + tuplesOutputKey + "," + statsOutputKey;
+    }
+
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class PORankTez extends PORank implements TezLoad {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(PORankTez.class);
+
+    private String tuplesInputKey;
+    private String statsInputKey;
+    private transient boolean isInputCached;
+    private transient KeyValueReader reader;
+    private transient Map<Integer, Long> counterOffsets;
+
+    public PORankTez(PORank copy) {
+        super(copy);
+    }
+
+    public void setTuplesInputKey(String tuplesInputKey) {
+        this.tuplesInputKey = tuplesInputKey;
+    }
+
+    public void setStatsInputKey(String statsInputKey) {
+        this.statsInputKey = statsInputKey;
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        String cacheKey = "rankstats-" + getOperatorKey().toString();
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            isInputCached = true;
+            inputsToSkip.add(statsInputKey);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        LogicalInput input = inputs.get(tuplesInputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + tuplesInputKey + " is missing");
+        }
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+
+        String cacheKey = "rankstats-" + getOperatorKey().toString();
+        if (isInputCached) {
+            counterOffsets = (Map<Integer, Long>) ObjectCache.getInstance().retrieve(cacheKey);
+            LOG.info("Found counter stats for PORankTez in Tez cache. cachekey=" + cacheKey);
+            return;
+        }
+        input = inputs.get(statsInputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + statsInputKey + " is missing");
+        }
+        try {
+            KeyValueReader reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + statsInputKey + " : input=" + input + ", reader=" + reader);
+            reader.next();
+            // POCounterStatsTez produces a HashMap which contains
+            // mapping of task id and the offset of record count in each task based on total record count
+            Map<String, Long> counterOffsetsTemp = (Map<String, Long>) ((Tuple)reader.getCurrentValue()).get(0);
+            counterOffsets = new HashMap<Integer, Long>(counterOffsetsTemp.size(), 1);
+            for (Entry<String, Long> entry : counterOffsetsTemp.entrySet()) {
+                counterOffsets.put(Integer.valueOf(entry.getKey()), entry.getValue());
+            }
+            ObjectCache.getInstance().cache(cacheKey, counterOffsets);
+            LOG.info("Cached PORankTez counter stats in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        Result inp = null;
+
+        try {
+            while (reader.next()) {
+                inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+                return addRank(inp);
+            }
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+
+        return RESULT_EOP;
+    }
+
+    @Override
+    protected Long getRankCounterOffset(Integer taskId) {
+        if (illustrator != null) {
+            return 0L;
+        }
+        return counterOffsets.get(taskId);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        super.visit(v);
+        v.visit(this);
+    }
+
+    @Override
+    public String name() {
+        return "PORankTez - " + mKey.toString() + "\t<-\t " + tuplesInputKey + "," + statsInputKey;
+    }
+
+}

Modified: pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm Thu Mar 27 18:59:41 2014
@@ -1001,15 +1001,17 @@ sub wrongExecutionMode($$)
 
     # Check that we should run this test.  If the current execution type
     # doesn't match the execonly flag, then skip this one.
-    my $wrong = ((defined $testCmd->{'execonly'} &&
-            $testCmd->{'execonly'} ne $testCmd->{'exectype'}));
+    my $wrong = 0;
 
-    if ($wrong) {
-        print $log "Skipping test $testCmd->{'group'}" . "_" .
-            $testCmd->{'num'} . " since it is executed only in " .
-            $testCmd->{'execonly'} . " mode and we are executing in " .
-            $testCmd->{'exectype'} . " mode.\n";
-        return $wrong;
+    if (defined $testCmd->{'execonly'}) {
+        my @exectypes = split(',', $testCmd->{'execonly'});
+        if (!$testCmd->{'exectype'} ~~ @exectypes) {
+            print $log "Skipping test $testCmd->{'group'}" . "_" .
+                $testCmd->{'num'} . " since it is executed only in " .
+                $testCmd->{'execonly'} . " mode and we are executing in " .
+                $testCmd->{'exectype'} . " mode.\n";
+            return 1;
+        }
     }
 
     if (defined $testCmd->{'ignore23'} && $testCmd->{'hadoopversion'}=='23') {

Modified: pig/branches/tez/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/nightly.conf?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/nightly.conf Thu Mar 27 18:59:41 2014
@@ -2658,7 +2658,7 @@ store c into ':OUTPATH:';\,
 				# Merge-join with one file across multiple blocks
         	    {
                 'num' => 8,
-			    'execonly' => 'mapred', # since this join will run out of memory in local mode
+			    'execonly' => 'mapred,tez', # since this join will run out of memory in local mode
 		        'floatpostprocess' => 1,
 		        'delimiter' => '	',
                 'pig' => q\a = load ':INPATH:/singlefile/votertab10k';
@@ -3249,7 +3249,7 @@ store b into ':OUTPATH:';\,
             'tests' => [
                     {
                     'num' => 1,
-                    'execonly' => 'mapred', # studenttab20m not available in local mode
+                    'execonly' => 'mapred,tez', # studenttab20m not available in local mode
                     'pig' => q\
 a = load ':INPATH:/singlefile/studenttab20m' using PigStorage() as (name, age, gpa);
 b = foreach a generate age;
@@ -3925,7 +3925,7 @@ store b into ':OUTPATH:';\,
                     {
                     # test group
                     'num' => 1,
-                    'execonly' => 'mapred', # since this join will run out of memory in local mode
+                    'execonly' => 'mapred,tez', # since this join will run out of memory in local mode
                     'pig' => q\register :FUNCPATH:/testudf.jar;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa);
 b = group a by age PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner2 parallel 2;
@@ -4531,7 +4531,7 @@ store C into ':OUTPATH:';\,
                 {
                     'num' => 1,
                     'java_params' => ['-Dopt.fetch=false'],
-                    'execonly' => 'mapred', # since distributed cache is not supported in local mode
+                    'execonly' => 'mapred,tez', # since distributed cache is not supported in local mode
                     'pig' => q?
                         register :FUNCPATH:/testudf.jar;
                         define udfdc org.apache.pig.test.udf.evalfunc.Udfcachetest(':INPATH:/singlefile/votertab10k#foodle');
@@ -4769,7 +4769,7 @@ store C into ':OUTPATH:';\,
                     }, {
                         # PIG-2576
                         'num' => 4,
-                        'execonly' => 'mapred',
+                        'execonly' => 'mapred,tez',
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define printconf org.apache.pig.test.udf.evalfunc.UdfContextFrontend('dummy');
                                 a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -4824,7 +4824,7 @@ store C into ':OUTPATH:';\,
                 ],
             },{
                 'name' => 'Bloom',
-			    'execonly' => 'mapred', # distributed cache does not work in local mode
+			    'execonly' => 'mapred,tez', # distributed cache does not work in local mode
                 'tests' => [
                     {
                         'num' => 1,
@@ -5225,7 +5225,7 @@ store a into ':OUTPATH:';\,
                 'tests' => [
                     {
 						'num' => 1,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 9;
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5240,7 +5240,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' => 2,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 9;
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5255,7 +5255,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' => 3,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 7;
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5270,7 +5270,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' => 4,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 7;
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5285,7 +5285,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' =>5,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 9;
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5300,7 +5300,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' =>6,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 7;
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5315,7 +5315,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' => 7,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 7;
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5333,7 +5333,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' => 8,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 9;
 									SET pig.splitCombination false;
@@ -5358,7 +5358,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' => 9,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 25;
 									A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
@@ -5374,7 +5374,7 @@ store a into ':OUTPATH:';\,
 								\,
 					}, {
 						'num' => 10,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									SET default_parallel 11;
 									SET pig.splitCombination false;
@@ -5396,7 +5396,7 @@ store a into ':OUTPATH:';\,
 								\,
             		}, {
 						'num' => 11,
-						'execonly' => 'mapred',
+						'execonly' => 'mapred,tez',
 						'pig' => q\
 									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
 									split A into M if rownumber > 15, N if rownumber < 25;

Modified: pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCombiner.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCombiner.java Thu Mar 27 18:59:41 2014
@@ -38,9 +38,7 @@ import org.apache.pig.data.DefaultDataBa
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
-import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -216,7 +214,11 @@ public class TestCombiner {
         assertFalse(it.hasNext());
         Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
         // Reset io.sort.mb to the original value before exit
-        properties.setProperty("io.sort.mb", oldValue);
+        if (oldValue == null) {
+            properties.remove("io.sort.mb");
+        } else {
+            properties.setProperty("io.sort.mb", oldValue);
+        }
         pigServer.shutdown();
     }
 

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld Thu Mar 27 18:59:41 2014
@@ -11,9 +11,9 @@ Tez vertex scope-25
 
 Tez vertex scope-18
 # Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-22	->	 scope-25
+Local Rearrange[tuple]{tuple}(false) - scope-22	->	 scope-25
 |   |
-|   Constant(DummyVal) - scope-21
+|   Project[tuple][*] - scope-21
 |
 |---a: New For Each(false,false)[bag] - scope-7
     |   |
@@ -28,9 +28,9 @@ Local Rearrange[tuple]{bytearray}(false)
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-19
 # Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-24	->	 scope-25
+Local Rearrange[tuple]{tuple}(false) - scope-24	->	 scope-25
 |   |
-|   Constant(DummyVal) - scope-23
+|   Project[tuple][*] - scope-23
 |
 |---c: New For Each(false,false)[bag] - scope-15
     |   |

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld Thu Mar 27 18:59:41 2014
@@ -0,0 +1,33 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-15
+#--------------------------------------------------
+Tez vertex scope-11	->	Tez vertex scope-14,Tez vertex scope-12,
+Tez vertex scope-12	->	Tez vertex scope-14,
+Tez vertex scope-14
+
+Tez vertex scope-11
+# Plan on vertex
+POCounterTez - scope-8	->	 scope-14,scope-12
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-12
+# Plan on vertex
+PORankStatsTez - scope-13	<-	 scope-11	->	 scope-14
+Tez vertex scope-14
+# Plan on vertex
+b: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-10
+|
+|---PORankTez - scope-9	<-	 scope-11,scope-12

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld Thu Mar 27 18:59:41 2014
@@ -0,0 +1,103 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-51
+#--------------------------------------------------
+Tez vertex scope-23	->	Tez vertex scope-24,
+Tez vertex scope-24	->	Tez vertex scope-41,Tez vertex scope-31,
+Tez vertex scope-31	->	Tez vertex scope-41,
+Tez vertex scope-41	->	Tez vertex scope-43,
+Tez vertex scope-43	->	Tez vertex scope-49,Tez vertex scope-47,
+Tez vertex scope-47	->	Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-23
+# Plan on vertex
+b: Local Rearrange[tuple]{int}(false) - scope-10	->	 scope-24
+|   |
+|   Project[int][0] - scope-11
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-24
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-27	->	 scope-31
+|   |
+|   Constant(DummyVal) - scope-26
+|
+|---ReservoirSample - scope-30
+    |
+    |---New For Each(false)[tuple] - scope-29
+        |   |
+        |   Project[int][0] - scope-28
+        |
+        |---b: Local Rearrange[tuple]{int}(false) - scope-25	->	 scope-41
+            |   |
+            |   Project[int][0] - scope-15
+            |
+            |---New For Each(true,false)[tuple] - scope-14
+                |   |
+                |   Project[int][0] - scope-12
+                |   |
+                |   Project[bag][1] - scope-13
+                |
+                |---b: Package(Packager)[tuple]{int} - scope-9
+Tez vertex scope-31
+# Plan on vertex
+POValueOutputTez - scope-40	->	 [scope-41]
+|
+|---New For Each(false)[tuple] - scope-39
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-38
+    |   |
+    |   |---Project[tuple][*] - scope-37
+    |
+    |---New For Each(false,false)[tuple] - scope-36
+        |   |
+        |   Constant(1) - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |
+        |---Package(Packager)[tuple]{bytearray} - scope-32
+Tez vertex scope-41
+# Plan on vertex
+POIdentityInOutTez - scope-42	->	 scope-43
+|   |
+|   Project[int][0] - scope-15
+Tez vertex scope-43
+# Plan on vertex
+POCounterTez - scope-17	->	 scope-49,scope-47
+|   |
+|   Project[int][0] - scope-15
+|
+|---New For Each(true)[tuple] - scope-46
+    |   |
+    |   Project[bag][1] - scope-45
+    |
+    |---Package(LitePackager)[tuple]{int} - scope-44
+Tez vertex scope-47
+# Plan on vertex
+PORankStatsTez - scope-48	<-	 scope-43	->	 scope-49
+Tez vertex scope-49
+# Plan on vertex
+b: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-22
+|
+|---New For Each(false,true)[tuple] - scope-21
+    |   |
+    |   Project[long][0] - scope-19
+    |   |
+    |   Project[bag][2] - scope-20
+    |
+    |---PORankTez - scope-18	<-	 scope-43,scope-47
+        |   |
+        |   Project[int][0] - scope-15

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Thu Mar 27 18:59:41 2014
@@ -324,6 +324,27 @@ public class TestTezCompiler {
         run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld");
     }
 
+    @Test
+    public void testRank() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = rank a;" +
+                "store b into 'file:///tmp/output/d';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld");
+    }
+
+    @Test
+    public void testRankBy() throws Exception {
+        //TODO: Physical plan (affects both MR and Tez) has extra job before order by. Does not look right.
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = rank a by x;" +
+                "store b into 'file:///tmp/output/d';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld");
+    }
+
     private void run(String query, String expectedFile) throws Exception {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         TezLauncher launcher = new TezLauncher();