You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/03/27 19:59:42 UTC
svn commit: r1582443 - in /pig/branches/tez: ivy/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/t...
Author: rohini
Date: Thu Mar 27 18:59:41 2014
New Revision: 1582443
URL: http://svn.apache.org/r1582443
Log:
PIG-3814: Implement RANK in Tez (rohini)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
Modified:
pig/branches/tez/ivy/libraries.properties
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm
pig/branches/tez/test/e2e/pig/tests/nightly.conf
pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Thu Mar 27 18:59:41 2014
@@ -40,9 +40,9 @@ guava.version=11.0
jersey-core.version=1.8
hadoop-core.version=1.0.4
hadoop-test.version=1.0.4
-hadoop-common.version=2.2.0
-hadoop-hdfs.version=2.2.0
-hadoop-mapreduce.version=2.2.0
+hadoop-common.version=2.3.0
+hadoop-hdfs.version=2.3.0
+hadoop-mapreduce.version=2.3.0
hbase94.version=0.94.1
hbase95.version=0.96.0-hadoop1
hsqldb.version=1.8.0.10
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Thu Mar 27 18:59:41 2014
@@ -18,15 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly.Map;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.data.Tuple;
@@ -47,16 +42,18 @@ public class PigMapReduceCounter {
/**
* Here is set up the task id, in order to be attached to each tuple
**/
+ @Override
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+ int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+ taskID = String.valueOf(taskIDInt);
pOperator = mp.getLeaves().get(0);
while(true) {
if(pOperator instanceof POCounter){
- ((POCounter) pOperator).setTaskId(taskID);
+ ((POCounter) pOperator).setTaskId(taskIDInt);
((POCounter) pOperator).resetLocalCounter();
break;
} else {
@@ -104,13 +101,14 @@ public class PigMapReduceCounter {
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+ int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+ taskID = String.valueOf(taskIDInt);
leaf = rp.getLeaves().get(0);
while(true) {
if(leaf instanceof POCounter){
- ((POCounter) leaf).setTaskId(taskID);
+ ((POCounter) leaf).setTaskId(taskIDInt);
((POCounter) leaf).resetLocalCounter();
break;
} else {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java Thu Mar 27 18:59:41 2014
@@ -32,7 +32,6 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
/**
* This operator is part of the RANK operator implementation.
@@ -80,7 +79,7 @@ public class POCounter extends PhysicalO
/**
* Task ID to label each tuple analyzed by the corresponding task
**/
- private String taskID = "-1";
+ private Integer taskID = -1;
/**
* Unique identifier that links POCounter and PORank,
@@ -104,6 +103,15 @@ public class POCounter extends PhysicalO
super(k, rp, inputs);
}
+ public POCounter(POCounter copy) {
+ super(copy);
+ this.counterPlans = copy.counterPlans;
+ this.mAscCols = copy.mAscCols;
+ this.isDenseRank = copy.isDenseRank;
+ this.isRowNumber = copy.isRowNumber;
+ this.operationID = copy.operationID;
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
public POCounter(OperatorKey operatorKey, int requestedParallelism,
List inp, List<PhysicalPlan> counterPlans,
@@ -156,22 +164,21 @@ public class POCounter extends PhysicalO
Tuple in = (Tuple) input.result;
Tuple out = mTupleFactory.newTuple(in.getAll().size() + 2);
Long sizeBag = 0L;
- int positionBag, i = 2;
+ int positionBag, i = 1;
// Tuples are added by two stamps before the tuple content:
- // 1.- At position 0: Current taskId
- out.set(0, getTaskId());
+ // 1.- At position 0: counter value
+ // 2.- At position last: Current taskId
- // 2.- At position 1: counter value
//On this case, each tuple is analyzed independently of the tuples grouped
if(isRowNumber() || isDenseRank()) {
//Only when is Dense Rank (attached to a reduce phase) it is incremented on this way
//Otherwise, the increment is done at mapper automatically
if(isDenseRank())
- PigMapReduceCounter.PigReduceCounter.incrementCounter(POCounter.ONE);
+ incrementReduceCounter(POCounter.ONE);
- out.set(1, getLocalCounter());
+ out.set(0, getLocalCounter());
//and the local incrementer is sequentially increased.
incrementLocalCounter();
@@ -186,9 +193,9 @@ public class POCounter extends PhysicalO
//This value (the size of the tuples on the bag) is used to increment
//the current global counter and
- PigMapReduceCounter.PigReduceCounter.incrementCounter(sizeBag);
+ incrementReduceCounter(sizeBag);
- out.set(1, getLocalCounter());
+ out.set(0, getLocalCounter());
//the value for the next tuple on the current task
addToLocalCounter(sizeBag);
@@ -199,11 +206,18 @@ public class POCounter extends PhysicalO
out.set(i++, o);
}
+ // At position last: Current taskId
+ out.set(i++, getTaskId());
+
input.result = illustratorMarkup(in, out, 0);
return input;
}
+ protected void incrementReduceCounter(Long increment) {
+ PigMapReduceCounter.PigReduceCounter.incrementCounter(increment);
+ }
+
@Override
public boolean supportsMultipleInputs() {
return false;
@@ -248,7 +262,7 @@ public class POCounter extends PhysicalO
/**
* Sequential counter used at ROW NUMBER and RANK BY DENSE mode
**/
- public Long incrementLocalCounter() {
+ protected Long incrementLocalCounter() {
return localCount++;
}
@@ -260,18 +274,18 @@ public class POCounter extends PhysicalO
return this.localCount;
}
- public void addToLocalCounter(Long sizeBag) {
+ protected void addToLocalCounter(Long sizeBag) {
this.localCount += sizeBag;
}
/**
* Task ID: identifier of the task (map or reducer)
**/
- public void setTaskId(String taskID) {
+ public void setTaskId(int taskID) {
this.taskID = taskID;
}
- public String getTaskId() {
+ public int getTaskId() {
return this.taskID;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java Thu Mar 27 18:59:41 2014
@@ -85,6 +85,13 @@ public class PORank extends PhysicalOper
super(k, rp, inp);
}
+ public PORank(PORank copy) {
+ super(copy);
+ this.rankPlans = copy.rankPlans;
+ this.mAscCols = copy.mAscCols;
+ this.ExprOutputTypes = copy.ExprOutputTypes;
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
public PORank(OperatorKey operatorKey, int requestedParallelism, List inp,
List<PhysicalPlan> rankPlans, List<Boolean> ascendingCol) {
@@ -142,20 +149,32 @@ public class PORank extends PhysicalOper
* Here is read the task identifier in order to get the corresponding cumulative sum,
* and the local counter at the tuple. These values are summed and prepended to the tuple.
* @param input processed by POCounter
- * @return input as Result. The input.result tuple owns the prepend rank value
+ * @return input as Result. The input.result tuple owns the prepend rank value
**/
public Result addRank(Result input) throws ExecException {
- int i = 1;
Tuple in = (Tuple) input.result;
- Tuple out = mTupleFactory.newTuple(in.getAll().size() - 1);
- Long taskId = Long.valueOf(in.get(0).toString());
- Long localCounter = (Long) in.get(1);
+ Long localCounter = (Long) in.get(0);
+ Integer taskId = (Integer) in.getAll().remove(in.getAll().size() - 1);
- String nameCounter = JobControlCompiler.PIG_MAP_COUNTER + getOperationID() + JobControlCompiler.PIG_MAP_SEPARATOR + String.valueOf(taskId);
+ Long rank = getRankCounterOffset(taskId);
+
+ in.set(0, rank + localCounter);
+
+ if(localCountIllustrator > 2)
+ localCountIllustrator = 0;
+
+ input.result = illustratorMarkup(in, in, localCountIllustrator);
+
+ localCountIllustrator++;
+
+ return input;
+ }
+ protected Long getRankCounterOffset(Integer taskId) {
+ String nameCounter = JobControlCompiler.PIG_MAP_COUNTER + getOperationID() + JobControlCompiler.PIG_MAP_SEPARATOR + String.valueOf(taskId);
Long rank = PigMapReduce.sJobConfInternal.get().getLong( nameCounter , -1L );
-
+
if(illustrator != null) {
rank = 0L;
}
@@ -164,23 +183,7 @@ public class PORank extends PhysicalOper
log.error("Error on reading counter "+ nameCounter);
throw new RuntimeException("Unable to read counter "+ nameCounter);
}
-
- out.set(0, rank + localCounter);
-
- //Add the content of the tuple
- List<Object> sub = in.getAll().subList(2, in.getAll().size());
-
- for (Object o : sub)
- out.set(i++, o);
-
- if(localCountIllustrator > 2)
- localCountIllustrator = 0;
-
- input.result = illustratorMarkup(in, out, localCountIllustrator);
-
- localCountIllustrator++;
-
- return input;
+ return rank;
}
@Override
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Thu Mar 27 18:59:41 2014
@@ -52,7 +52,7 @@ public class POValueOutputTez extends Ph
// value only input output
protected transient List<KeyValueWriter> writers;
- private static EmptyWritable EMPTY_KEY = new EmptyWritable();
+ public static EmptyWritable EMPTY_KEY = new EmptyWritable();
public POValueOutputTez(OperatorKey k) {
super(k);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Thu Mar 27 18:59:41 2014
@@ -100,6 +100,11 @@ public class PigProcessor implements Log
// Set the job conf as a thread-local member of PigMapReduce
// for backwards compatibility with the existing code base.
PigMapReduce.sJobConfInternal.set(conf);
+
+ LinkedList<TezTaskConfigurable> tezTCs = PlanHelper.getPhysicalOperators(execPlan, TezTaskConfigurable.class);
+ for (TezTaskConfigurable tezTC : tezTCs){
+ tezTC.initialize(processorContext);
+ }
}
@Override
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Mar 27 18:59:41 2014
@@ -31,6 +31,9 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
@@ -77,7 +80,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTezFactory.LocalRearrangeType;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterStatsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.PORankTez;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -565,9 +572,16 @@ public class TezCompiler extends PhyPlan
@Override
public void visitCounter(POCounter op) throws VisitorException {
- int errCode = 2034;
- String msg = "Cannot compile " + op.getClass().getSimpleName();
- throw new TezCompilerException(msg, errCode, PigException.BUG);
+ // Refer visitRank(PORank) for more details
+ try{
+ POCounterTez counterTez = new POCounterTez(op);
+ nonBlocking(counterTez);
+ phyToTezOpMap.put(op, curTezOp);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+ }
}
@Override
@@ -1097,9 +1111,71 @@ public class TezCompiler extends PhyPlan
@Override
public void visitRank(PORank op) throws VisitorException {
- int errCode = 2034;
- String msg = "Cannot compile " + op.getClass().getSimpleName();
- throw new TezCompilerException(msg, errCode, PigException.BUG);
+ try{
+ // Rank implementation has 3 vertices
+ // Vertex 1 has POCounterTez produce output tuples and send to Vertex 3 via 1-1 edge.
+ // Vertex 1 also sends the count of tuples of each task in Vertex 1 to Vertex 2 which is a single reducer.
+ // Vertex 3 has PORankTez which consumes from Vertex 2 as broadcast input and also tuples from Vertex 1 and
+ // produces tuples with updated ranks based on the count of tuples from Vertex 2.
+ // This is different from MR implementation where POCounter updates job counters, and that is
+ // copied by JobControlCompiler into the PORank job's jobconf.
+
+ // Previous operator is always POCounterTez (Vertex 1)
+ TezOperator counterOper = curTezOp;
+ POCounterTez counterTez = (POCounterTez) counterOper.plan.getLeaves().get(0);
+
+ //Construct Vertex 2
+ TezOperator statsOper = getTezOp();
+ tezPlan.add(statsOper);
+ POCounterStatsTez counterStatsTez = new POCounterStatsTez(OperatorKey.genOpKey(scope));
+ statsOper.plan.addAsLeaf(counterStatsTez);
+ statsOper.setRequestedParallelism(1);
+
+ //Construct Vertex 3
+ TezOperator rankOper = getTezOp();
+ tezPlan.add(rankOper);
+ PORankTez rankTez = new PORankTez(op);
+ rankOper.plan.addAsLeaf(rankTez);
+ curTezOp = rankOper;
+
+ // Connect counterOper vertex to rankOper vertex by 1-1 edge
+ rankOper.setRequestedParallelismByReference(counterOper);
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, counterOper, rankOper);
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
+ edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+ counterTez.setTuplesOutputKey(rankOper.getOperatorKey().toString());
+ rankTez.setTuplesInputKey(counterOper.getOperatorKey().toString());
+
+ // Connect counterOper vertex to statsOper vertex by Shuffle edge
+ edge = TezCompilerUtil.connect(tezPlan, counterOper, statsOper);
+ // Task id
+ edge.setIntermediateOutputKeyClass(IntWritable.class.getName());
+ edge.partitionerClass = HashPartitioner.class;
+ // Number of records in that task
+ edge.setIntermediateOutputValueClass(LongWritable.class.getName());
+ counterTez.setStatsOutputKey(statsOper.getOperatorKey().toString());
+ counterStatsTez.setInputKey(counterOper.getOperatorKey().toString());
+
+ // Connect statsOper vertex to rankOper vertex by Broadcast edge
+ edge = TezCompilerUtil.connect(tezPlan, statsOper, rankOper);
+ edge.dataMovementType = DataMovementType.BROADCAST;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
+ // Map of task id, offset count based on total number of records
+ edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+ counterStatsTez.setOutputKey(rankOper.getOperatorKey().toString());
+ rankTez.setStatsInputKey(statsOper.getOperatorKey().toString());
+
+ phyToTezOpMap.put(op, rankOper);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+ }
}
@Override
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Mar 27 18:59:41 2014
@@ -249,6 +249,7 @@ public class TezDagBuilder extends TezOp
}
}
+ //TODO: Remove this and set the classes on edge in TezCompiler
List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
POValueOutputTez.class);
if (!valueOutputs.isEmpty()) {
@@ -267,6 +268,23 @@ public class TezDagBuilder extends TezOp
}
}
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
+ MRPartitioner.class.getName());
+
+ if (edge.getIntermediateOutputKeyClass() != null) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+ edge.getIntermediateOutputKeyClass());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ edge.getIntermediateOutputKeyClass());
+ }
+
+ if (edge.getIntermediateOutputValueClass() != null) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+ edge.getIntermediateOutputValueClass());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+ edge.getIntermediateOutputValueClass());
+ }
+
conf.setBoolean("mapred.mapper.new-api", true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java Thu Mar 27 18:59:41 2014
@@ -47,6 +47,9 @@ public class TezEdgeDescriptor {
// Sort order for secondary keys;
private boolean[] secondarySortOrder;
+ private String intermediateOutputKeyClass;
+ private String intermediateOutputValueClass;
+
public TezEdgeDescriptor() {
combinePlan = new PhysicalPlan();
@@ -79,4 +82,20 @@ public class TezEdgeDescriptor {
}
}
+ public String getIntermediateOutputKeyClass() {
+ return intermediateOutputKeyClass;
+ }
+
+ public void setIntermediateOutputKeyClass(String intermediateOutputKeyClass) {
+ this.intermediateOutputKeyClass = intermediateOutputKeyClass;
+ }
+
+ public String getIntermediateOutputValueClass() {
+ return intermediateOutputValueClass;
+ }
+
+ public void setIntermediateOutputValueClass(String intermediateOutputValueClass) {
+ this.intermediateOutputValueClass = intermediateOutputValueClass;
+ }
+
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+/**
+ * This interface is implemented by PhysicalOperators that can need to access
+ * TezProcessorContext of a Tez task.
+ */
+
+public interface TezTaskConfigurable {
+
+ public void initialize(TezProcessorContext processorContext) throws ExecException;
+
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * POCounterStatsTez is used to group counters from previous vertex POCounterTez tasks
+ */
+public class POCounterStatsTez extends PhysicalOperator implements TezLoad, TezOutput {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POCounterStatsTez.class);
+ private String inputKey;
+ private String outputKey;
+ // TODO: Even though we expect only one record from POCounter, because of Shuffle we have
+ // KeyValuesReader. After TEZ-661, switch to unsorted shuffle
+ private transient KeyValuesReader reader;
+ private transient KeyValueWriter writer;
+
+ public POCounterStatsTez(OperatorKey k) {
+ super(k);
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf)
+ throws ExecException {
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ reader = (KeyValuesReader) input.getReader();
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ LogicalOutput output = outputs.get(outputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + outputKey + " is missing");
+ }
+ try {
+ writer = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ try {
+ Map<Integer, Long> counterRecords = new HashMap<Integer, Long>();
+ Integer key = null;
+ Long value = null;
+ // Read count of records per task
+ while (reader.next()) {
+ key = ((IntWritable)reader.getCurrentKey()).get();
+ for (Object val : reader.getCurrentValues()) {
+ value = ((LongWritable)val).get();
+ counterRecords.put(key, value);
+ }
+ }
+
+ // BinInterSedes only takes String for map key
+ Map<String, Long> counterOffsets = new HashMap<String, Long>();
+ // Create a map to contain task ids and beginning offset of record count
+ // based on total record count of all tasks
+ // For eg: If Task 0 has 5 records, Task 1 has 10 records and Task 2 has 3 records
+ // map will contain {0=0, 1=5, 2=15}
+ Long prevTasksCount = counterRecords.get(0);
+ counterOffsets.put("0", 0L);
+ for (int i = 1; i < counterRecords.size(); i++) {
+ counterOffsets.put("" + i, prevTasksCount);
+ prevTasksCount += counterRecords.get(i);
+ }
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ tuple.set(0, counterOffsets);
+ writer.write(POValueOutputTez.EMPTY_KEY, tuple);
+ return RESULT_EOP;
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ public void setOutputKey(String outputKey) {
+ this.outputKey = outputKey;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "PORankStatsTez - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+ }
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezTaskConfigurable;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+public class POCounterTez extends POCounter implements TezOutput, TezTaskConfigurable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POCounterTez.class);
+
+ private String tuplesOutputKey;
+ private String statsOutputKey;
+
+ private transient KeyValueWriter tuplesWriter;
+ private transient KeyValueWriter statsWriter;
+ private transient long totalTaskRecords = 0;
+
+ public POCounterTez(POCounter copy) {
+ super(copy);
+ }
+
+ public void setTuplesOutputKey(String tuplesOutputKey) {
+ this.tuplesOutputKey = tuplesOutputKey;
+ }
+
+ public void setStatsOutputKey(String statsOutputKey) {
+ this.statsOutputKey = statsOutputKey;
+ }
+
+ @Override
+ public void initialize(TezProcessorContext processorContext)
+ throws ExecException {
+ this.setTaskId(processorContext.getTaskIndex());
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ LogicalOutput output = outputs.get(tuplesOutputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + tuplesOutputKey + " is missing");
+ }
+ try {
+ tuplesWriter = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + tuplesOutputKey + " : output=" + output + ", writer=" + tuplesWriter);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+
+ output = outputs.get(statsOutputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + statsOutputKey + " is missing");
+ }
+ try {
+ statsWriter = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + statsOutputKey + " : output=" + output + ", writer=" + statsWriter);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ Result inp = null;
+ try {
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP
+ || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ tuplesWriter.write(POValueOutputTez.EMPTY_KEY,
+ addCounterValue(inp).result);
+ }
+
+ statsWriter.write(new IntWritable(this.getTaskId()), new LongWritable(totalTaskRecords));
+
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ return RESULT_EOP;
+ }
+
+ @Override
+ protected Long incrementLocalCounter() {
+ totalTaskRecords++;
+ return super.incrementLocalCounter();
+ }
+
+ @Override
+ protected void addToLocalCounter(Long sizeBag) {
+ super.addToLocalCounter(sizeBag);
+ totalTaskRecords += sizeBag;
+ }
+
+ @Override
+ protected void incrementReduceCounter(Long increment) {
+ totalTaskRecords += increment;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ super.visit(v);
+ v.visit(this);
+ }
+
+ @Override
+ public String name() {
+ return "POCounterTez - " + mKey.toString() + "\t->\t " + tuplesOutputKey + "," + statsOutputKey;
+ }
+
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java Thu Mar 27 18:59:41 2014
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class PORankTez extends PORank implements TezLoad {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(PORankTez.class);
+
+ private String tuplesInputKey;
+ private String statsInputKey;
+ private transient boolean isInputCached;
+ private transient KeyValueReader reader;
+ private transient Map<Integer, Long> counterOffsets;
+
+ public PORankTez(PORank copy) {
+ super(copy);
+ }
+
+ public void setTuplesInputKey(String tuplesInputKey) {
+ this.tuplesInputKey = tuplesInputKey;
+ }
+
+ public void setStatsInputKey(String statsInputKey) {
+ this.statsInputKey = statsInputKey;
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ String cacheKey = "rankstats-" + getOperatorKey().toString();
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ isInputCached = true;
+ inputsToSkip.add(statsInputKey);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ LogicalInput input = inputs.get(tuplesInputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + tuplesInputKey + " is missing");
+ }
+ try {
+ reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+
+ String cacheKey = "rankstats-" + getOperatorKey().toString();
+ if (isInputCached) {
+ counterOffsets = (Map<Integer, Long>) ObjectCache.getInstance().retrieve(cacheKey);
+ LOG.info("Found counter stats for PORankTez in Tez cache. cachekey=" + cacheKey);
+ return;
+ }
+ input = inputs.get(statsInputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + statsInputKey + " is missing");
+ }
+ try {
+ KeyValueReader reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + statsInputKey + " : input=" + input + ", reader=" + reader);
+ reader.next();
+ // POCounterStatsTez produces a HashMap which contains
+ // mapping of task id and the offset of record count in each task based on total record count
+ Map<String, Long> counterOffsetsTemp = (Map<String, Long>) ((Tuple)reader.getCurrentValue()).get(0);
+ counterOffsets = new HashMap<Integer, Long>(counterOffsetsTemp.size(), 1);
+ for (Entry<String, Long> entry : counterOffsetsTemp.entrySet()) {
+ counterOffsets.put(Integer.valueOf(entry.getKey()), entry.getValue());
+ }
+ ObjectCache.getInstance().cache(cacheKey, counterOffsets);
+ LOG.info("Cached PORankTez counter stats in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ Result inp = null;
+
+ try {
+ while (reader.next()) {
+ inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+ return addRank(inp);
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+
+ return RESULT_EOP;
+ }
+
+ @Override
+ protected Long getRankCounterOffset(Integer taskId) {
+ if (illustrator != null) {
+ return 0L;
+ }
+ return counterOffsets.get(taskId);
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ super.visit(v);
+ v.visit(this);
+ }
+
+ @Override
+ public String name() {
+ return "PORankTez - " + mKey.toString() + "\t<-\t " + tuplesInputKey + "," + statsInputKey;
+ }
+
+}
Modified: pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/tez/test/e2e/pig/drivers/TestDriverPig.pm Thu Mar 27 18:59:41 2014
@@ -1001,15 +1001,17 @@ sub wrongExecutionMode($$)
# Check that we should run this test. If the current execution type
# doesn't match the execonly flag, then skip this one.
- my $wrong = ((defined $testCmd->{'execonly'} &&
- $testCmd->{'execonly'} ne $testCmd->{'exectype'}));
+ my $wrong = 0;
- if ($wrong) {
- print $log "Skipping test $testCmd->{'group'}" . "_" .
- $testCmd->{'num'} . " since it is executed only in " .
- $testCmd->{'execonly'} . " mode and we are executing in " .
- $testCmd->{'exectype'} . " mode.\n";
- return $wrong;
+ if (defined $testCmd->{'execonly'}) {
+ my @exectypes = split(',', $testCmd->{'execonly'});
+ if (!$testCmd->{'exectype'} ~~ @exectypes) {
+ print $log "Skipping test $testCmd->{'group'}" . "_" .
+ $testCmd->{'num'} . " since it is executed only in " .
+ $testCmd->{'execonly'} . " mode and we are executing in " .
+ $testCmd->{'exectype'} . " mode.\n";
+ return 1;
+ }
}
if (defined $testCmd->{'ignore23'} && $testCmd->{'hadoopversion'}=='23') {
Modified: pig/branches/tez/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/nightly.conf?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/nightly.conf Thu Mar 27 18:59:41 2014
@@ -2658,7 +2658,7 @@ store c into ':OUTPATH:';\,
# Merge-join with one file across multiple blocks
{
'num' => 8,
- 'execonly' => 'mapred', # since this join will run out of memory in local mode
+ 'execonly' => 'mapred,tez', # since this join will run out of memory in local mode
'floatpostprocess' => 1,
'delimiter' => ' ',
'pig' => q\a = load ':INPATH:/singlefile/votertab10k';
@@ -3249,7 +3249,7 @@ store b into ':OUTPATH:';\,
'tests' => [
{
'num' => 1,
- 'execonly' => 'mapred', # studenttab20m not available in local mode
+ 'execonly' => 'mapred,tez', # studenttab20m not available in local mode
'pig' => q\
a = load ':INPATH:/singlefile/studenttab20m' using PigStorage() as (name, age, gpa);
b = foreach a generate age;
@@ -3925,7 +3925,7 @@ store b into ':OUTPATH:';\,
{
# test group
'num' => 1,
- 'execonly' => 'mapred', # since this join will run out of memory in local mode
+ 'execonly' => 'mapred,tez', # since this join will run out of memory in local mode
'pig' => q\register :FUNCPATH:/testudf.jar;
a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa);
b = group a by age PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner2 parallel 2;
@@ -4531,7 +4531,7 @@ store C into ':OUTPATH:';\,
{
'num' => 1,
'java_params' => ['-Dopt.fetch=false'],
- 'execonly' => 'mapred', # since distributed cache is not supported in local mode
+ 'execonly' => 'mapred,tez', # since distributed cache is not supported in local mode
'pig' => q?
register :FUNCPATH:/testudf.jar;
define udfdc org.apache.pig.test.udf.evalfunc.Udfcachetest(':INPATH:/singlefile/votertab10k#foodle');
@@ -4769,7 +4769,7 @@ store C into ':OUTPATH:';\,
}, {
# PIG-2576
'num' => 4,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q?register :FUNCPATH:/testudf.jar;
define printconf org.apache.pig.test.udf.evalfunc.UdfContextFrontend('dummy');
a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -4824,7 +4824,7 @@ store C into ':OUTPATH:';\,
],
},{
'name' => 'Bloom',
- 'execonly' => 'mapred', # distributed cache does not work in local mode
+ 'execonly' => 'mapred,tez', # distributed cache does not work in local mode
'tests' => [
{
'num' => 1,
@@ -5225,7 +5225,7 @@ store a into ':OUTPATH:';\,
'tests' => [
{
'num' => 1,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 9;
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5240,7 +5240,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 2,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 9;
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5255,7 +5255,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 3,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 7;
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5270,7 +5270,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 4,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 7;
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5285,7 +5285,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' =>5,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 9;
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5300,7 +5300,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' =>6,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 7;
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5315,7 +5315,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 7,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 7;
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
@@ -5333,7 +5333,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 8,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 9;
SET pig.splitCombination false;
@@ -5358,7 +5358,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 9,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 25;
A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
@@ -5374,7 +5374,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 10,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
SET default_parallel 11;
SET pig.splitCombination false;
@@ -5396,7 +5396,7 @@ store a into ':OUTPATH:';\,
\,
}, {
'num' => 11,
- 'execonly' => 'mapred',
+ 'execonly' => 'mapred,tez',
'pig' => q\
A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
split A into M if rownumber > 15, N if rownumber < 25;
Modified: pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCombiner.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCombiner.java Thu Mar 27 18:59:41 2014
@@ -38,9 +38,7 @@ import org.apache.pig.data.DefaultDataBa
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
-import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -216,7 +214,11 @@ public class TestCombiner {
assertFalse(it.hasNext());
Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
// Reset io.sort.mb to the original value before exit
- properties.setProperty("io.sort.mb", oldValue);
+ if (oldValue == null) {
+ properties.remove("io.sort.mb");
+ } else {
+ properties.setProperty("io.sort.mb", oldValue);
+ }
pigServer.shutdown();
}
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld Thu Mar 27 18:59:41 2014
@@ -11,9 +11,9 @@ Tez vertex scope-25
Tez vertex scope-18
# Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-22 -> scope-25
+Local Rearrange[tuple]{tuple}(false) - scope-22 -> scope-25
| |
-| Constant(DummyVal) - scope-21
+| Project[tuple][*] - scope-21
|
|---a: New For Each(false,false)[bag] - scope-7
| |
@@ -28,9 +28,9 @@ Local Rearrange[tuple]{bytearray}(false)
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-19
# Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-24 -> scope-25
+Local Rearrange[tuple]{tuple}(false) - scope-24 -> scope-25
| |
-| Constant(DummyVal) - scope-23
+| Project[tuple][*] - scope-23
|
|---c: New For Each(false,false)[bag] - scope-15
| |
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld Thu Mar 27 18:59:41 2014
@@ -0,0 +1,33 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-15
+#--------------------------------------------------
+Tez vertex scope-11 -> Tez vertex scope-14,Tez vertex scope-12,
+Tez vertex scope-12 -> Tez vertex scope-14,
+Tez vertex scope-14
+
+Tez vertex scope-11
+# Plan on vertex
+POCounterTez - scope-8 -> scope-14,scope-12
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-12
+# Plan on vertex
+PORankStatsTez - scope-13 <- scope-11 -> scope-14
+Tez vertex scope-14
+# Plan on vertex
+b: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-10
+|
+|---PORankTez - scope-9 <- scope-11,scope-12
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld?rev=1582443&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld Thu Mar 27 18:59:41 2014
@@ -0,0 +1,103 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-51
+#--------------------------------------------------
+Tez vertex scope-23 -> Tez vertex scope-24,
+Tez vertex scope-24 -> Tez vertex scope-41,Tez vertex scope-31,
+Tez vertex scope-31 -> Tez vertex scope-41,
+Tez vertex scope-41 -> Tez vertex scope-43,
+Tez vertex scope-43 -> Tez vertex scope-49,Tez vertex scope-47,
+Tez vertex scope-47 -> Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-23
+# Plan on vertex
+b: Local Rearrange[tuple]{int}(false) - scope-10 -> scope-24
+| |
+| Project[int][0] - scope-11
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-24
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-27 -> scope-31
+| |
+| Constant(DummyVal) - scope-26
+|
+|---ReservoirSample - scope-30
+ |
+ |---New For Each(false)[tuple] - scope-29
+ | |
+ | Project[int][0] - scope-28
+ |
+ |---b: Local Rearrange[tuple]{int}(false) - scope-25 -> scope-41
+ | |
+ | Project[int][0] - scope-15
+ |
+ |---New For Each(true,false)[tuple] - scope-14
+ | |
+ | Project[int][0] - scope-12
+ | |
+ | Project[bag][1] - scope-13
+ |
+ |---b: Package(Packager)[tuple]{int} - scope-9
+Tez vertex scope-31
+# Plan on vertex
+POValueOutputTez - scope-40 -> [scope-41]
+|
+|---New For Each(false)[tuple] - scope-39
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-38
+ | |
+ | |---Project[tuple][*] - scope-37
+ |
+ |---New For Each(false,false)[tuple] - scope-36
+ | |
+ | Constant(1) - scope-35
+ | |
+ | Project[bag][1] - scope-33
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-32
+Tez vertex scope-41
+# Plan on vertex
+POIdentityInOutTez - scope-42 -> scope-43
+| |
+| Project[int][0] - scope-15
+Tez vertex scope-43
+# Plan on vertex
+POCounterTez - scope-17 -> scope-49,scope-47
+| |
+| Project[int][0] - scope-15
+|
+|---New For Each(true)[tuple] - scope-46
+ | |
+ | Project[bag][1] - scope-45
+ |
+ |---Package(LitePackager)[tuple]{int} - scope-44
+Tez vertex scope-47
+# Plan on vertex
+PORankStatsTez - scope-48 <- scope-43 -> scope-49
+Tez vertex scope-49
+# Plan on vertex
+b: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-22
+|
+|---New For Each(false,true)[tuple] - scope-21
+ | |
+ | Project[long][0] - scope-19
+ | |
+ | Project[bag][2] - scope-20
+ |
+ |---PORankTez - scope-18 <- scope-43,scope-47
+ | |
+ | Project[int][0] - scope-15
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1582443&r1=1582442&r2=1582443&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Thu Mar 27 18:59:41 2014
@@ -324,6 +324,27 @@ public class TestTezCompiler {
run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld");
}
+ @Test
+ public void testRank() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = rank a;" +
+ "store b into 'file:///tmp/output/d';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld");
+ }
+
+ @Test
+ public void testRankBy() throws Exception {
+ //TODO: Physical plan (affects both MR and Tez) has extra job before order by. Does not look right.
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = rank a by x;" +
+ "store b into 'file:///tmp/output/d';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld");
+ }
+
private void run(String query, String expectedFile) throws Exception {
PhysicalPlan pp = Util.buildPp(pigServer, query);
TezLauncher launcher = new TezLauncher();