You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/17 09:02:44 UTC

svn commit: r1568895 - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/tez/ test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/tez/

Author: rohini
Date: Mon Feb 17 08:02:44 2014
New Revision: 1568895

URL: http://svn.apache.org/r1568895
Log:
PIG-3766: Use ONE_TO_ONE edge and IdentityInOut in skewed join intermediate vertex (rohini)

Added:
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Mon Feb 17 08:02:44 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.impl.io.NullablePartitionWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
@@ -99,14 +101,31 @@ public class POIdentityInOutTez extends 
                 while (shuffleReader.next()) {
                     Object curKey = shuffleReader.getCurrentKey();
                     Iterable<Object> vals = shuffleReader.getCurrentValues();
+                    if (isSkewedJoin) {
+                        NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
+                                (PigNullableWritable) curKey);
+                        wrappedKey.setPartition(-1);
+                        curKey = wrappedKey;
+                    }
                     for (Object val : vals) {
                         writer.write(curKey, val);
                     }
                 }
             } else {
                 while (reader.next()) {
-                    writer.write(reader.getCurrentKey(),
-                            reader.getCurrentValue());
+                    if (isSkewedJoin) {
+                        NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
+                                (PigNullableWritable) reader.getCurrentKey());
+                        // Skewed join wraps key with NullablePartitionWritable
+                        // The partitionIndex in NullablePartitionWritable is not serialized.
+                        // So setting it here instead of the previous vertex POLocalRearrangeTez.
+                        // Serializing it would add overhead for MR as well.
+                        wrappedKey.setPartition(-1);
+                        writer.write(wrappedKey, reader.getCurrentValue());
+                    } else {
+                        writer.write(reader.getCurrentKey(),
+                                reader.getCurrentValue());
+                    }
                 }
             }
             return RESULT_EOP;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Mon Feb 17 08:02:44 2014
@@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -50,8 +49,8 @@ public class POLocalRearrangeTez extends
     protected transient KeyValueWriter writer;
 
     // Tez union is implemented as LR + Pkg
-    private boolean isUnion = false;
-    private boolean isSkewedJoin = false;
+    protected boolean isUnion = false;
+    protected boolean isSkewedJoin = false;
 
     public POLocalRearrangeTez(OperatorKey k) {
         super(k);
@@ -63,6 +62,13 @@ public class POLocalRearrangeTez extends
 
     public POLocalRearrangeTez(POLocalRearrange copy) {
         super(copy);
+        if (copy instanceof POLocalRearrangeTez) {
+            POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy;
+            this.isUnion = copyTez.isUnion;
+            this.isSkewedJoin = copyTez.isSkewedJoin;
+            this.outputKey = copyTez.outputKey;
+        }
+
     }
 
     public String getOutputKey() {
@@ -140,12 +146,6 @@ public class POLocalRearrangeTez extends
                         val = new NullableTuple((Tuple)result.get(1));
                     } else {
                         key = HDataType.getWritableComparableTypes(result.get(1), keyType);
-                        if (isSkewedJoin) {
-                            // Skewed join wraps key with NullablePartitionWritable
-                            NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
-                            wrappedKey.setPartition(-1);
-                            key = wrappedKey;
-                        }
                         val = new NullableTuple((Tuple)result.get(2));
                     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Feb 17 08:02:44 2014
@@ -36,10 +36,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.SchemaTupleBackend;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.common.TezUtils;
@@ -229,8 +228,7 @@ public class PigProcessor implements Log
         Object val = reader.getCurrentValue();
         if (val != null) {
             // Sample is not empty
-            NullableTuple nTup = (NullableTuple) val;
-            Tuple t = (Tuple) nTup.getValueAsPigType();
+            BinSedesTuple t = (BinSedesTuple) val;
             sampleMap = (Map<String, Object>) t.get(0);
         }
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Feb 17 08:02:44 2014
@@ -1238,8 +1238,12 @@ public class TezCompiler extends PhyPlan
     public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
         //TODO: handle split and connect
         try {
+
+            // The first vertex (prevOp) loads the left table and sends sample of join keys to
+            // vertex 2 (sampler vertex) and all data to vertex 3 (partition vertex) via 1-1 edge
+
             // LR that transfers loaded input to partition vertex
-            POLocalRearrangeTez lrTez = localRearrangeFactory.create(LocalRearrangeType.NULL);
+            POLocalRearrangeTez lrTez = new POLocalRearrangeTez(OperatorKey.genOpKey(scope));
             // LR that broadcasts sampled input to sampling aggregation vertex
             POLocalRearrangeTez lrTezSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
 
@@ -1311,8 +1315,8 @@ public class TezCompiler extends PhyPlan
             String mc = pigProperties.getProperty("pig.skewedjoin.reduce.maxtuple", "0");
 
             int rp = Math.max(op.getRequestedParallelism(), 1);
-            Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJobs(sort, rp, null,
-                    PartitionSkewedKeys.class.getName(), new String[]{per, mc}, 2);
+            Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJob(sort, rp, null,
+                    PartitionSkewedKeys.class.getName(), new String[]{per, mc});
             rp = sampleJobPair.second;
 
             // Set parallelism of SkewedJoin as the value calculated by sampling
@@ -1329,17 +1333,8 @@ public class TezCompiler extends PhyPlan
 
             blocking();
 
-            // Then add a POPackage and a POForEach to the start of the new tezOp.
-            POPackage pkg = getPackage(1, DataType.BYTEARRAY);
-            curTezOp.plan.add(pkg);
-
-            POProject project = new POProject(OperatorKey.genOpKey(scope));
-            project.setResultType(DataType.BAG);
-            project.setStar(false);
-            project.setColumn(1);
-            POForEach forEach =
-                    TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
-            curTezOp.plan.addAsLeaf(forEach);
+            // Add a POIdentityInOutTez to the joinJobs[0] which is a partition vertex.
+            // It just partitions the data from first vertex based on the quantiles from sample vertex.
             joinJobs[0] = curTezOp;
 
             // Run POLocalRearrange for first join table. Note we set the
@@ -1347,11 +1342,9 @@ public class TezCompiler extends PhyPlan
             // its parallelism will be determined by the size of skewed table.
             //TODO: Check if this really works as load vertex parallelism
             // is determined during vertex construction.
-            POLocalRearrangeTez lr =
-                    new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)),
-                            Math.max(prevOp.getRequestedParallelism(), 1));
+            lrTez.setRequestedParallelism(prevOp.getRequestedParallelism());
             try {
-                lr.setIndex(0);
+                lrTez.setIndex(0);
             } catch (ExecException e) {
                 int errCode = 2058;
                 String msg = "Unable to set index on newly created POLocalRearrange.";
@@ -1363,13 +1356,15 @@ public class TezCompiler extends PhyPlan
             if (groups.size() == 1) {
                 type = groups.get(0).getLeaves().get(0).getResultType();
             }
-
-            // Run POLocalRearrange for first join table
-            lr.setKeyType(type);
-            lr.setPlans(groups);
-            lr.setSkewedJoin(true);
-            lr.setResultType(DataType.TUPLE);
-            joinJobs[0].plan.addAsLeaf(lr);
+            lrTez.setKeyType(type);
+            lrTez.setPlans(groups);
+            lrTez.setSkewedJoin(true);
+            lrTez.setResultType(DataType.TUPLE);
+
+            POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
+                    OperatorKey.genOpKey(scope), lrTez);
+            identityInOutTez.setInputKey(prevOp.getOperatorKey().toString());
+            joinJobs[0].plan.addAsLeaf(identityInOutTez);
             joinJobs[0].setClosed(true);
             rearrangeOutputs[0] = joinJobs[0];
 
@@ -1413,7 +1408,7 @@ public class TezCompiler extends PhyPlan
             compiledInputs = new TezOperator[] {joinJobs[2]};
 
             // Create POPakcage
-            pkg = getPackage(2, type);
+            POPackage pkg = getPackage(2, type);
             pkg.setResultType(DataType.TUPLE);
             boolean [] inner = op.getInnerFlags();
             pkg.getPkgr().setInner(inner);
@@ -1449,33 +1444,37 @@ public class TezCompiler extends PhyPlan
             // Connect vertices
             lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
             lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
+            identityInOutTez.setOutputKey(joinJobs[2].getOperatorKey().toString());
+            pr.setOutputKey(joinJobs[2].getOperatorKey().toString());
 
             TezEdgeDescriptor edge = joinJobs[0].inEdges.get(prevOp.getOperatorKey());
             // TODO: Convert to unsorted shuffle after TEZ-661
-            // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-            // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
+            // Use 1-1 edge
+            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            // If prevOp.requestedParallelism changes based on no. of input splits
+            // it will reflect for joinJobs[0] so that 1-1 edge will work.
+            joinJobs[0].setRequestedParallelismByReference(prevOp);
 
             TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
 
-            POSplit split = (POSplit) sampleJobPair.first.plan.getLeaves().get(0);
-            List<PhysicalPlan> pp = split.getPlans();
-            for (int i = 0; i < pp.size(); i++) {
-                TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
+            POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
+            for (int i = 0; i < 2; i++) {
+                joinJobs[i].sampleOperator = sampleJobPair.first;
 
                 // Configure broadcast edges for distribution map
-                edge = joinJobs[i].inEdges.get(sampleJobPair.first.getOperatorKey());
+                edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
                 edge.dataMovementType = DataMovementType.BROADCAST;
                 edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
                 edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-                lrTez = (POLocalRearrangeTez) pp.get(i).getLeaves().get(0);
-                lrTez.setOutputKey(joinJobs[i].getOperatorKey().toString());
-                joinJobs[i].sampleOperator = sampleJobPair.first;
+                sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
 
                 // Configure skewed partitioner for join
                 edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
                 edge.partitionerClass = SkewedPartitionerTez.class;
             }
+
             joinJobs[2].setSkewedJoin(true);
 
             phyToTezOpMap.put(op, curTezOp);
@@ -1609,7 +1608,7 @@ public class TezCompiler extends PhyPlan
         return oper;
     }
 
-    private Pair<TezOperator,Integer> getQuantileJobs(
+    private Pair<TezOperator,Integer> getOrderbySamplingAggregationJob(
             POSort inpSort,
             int rp) throws PlanException, VisitorException, ExecException {
 
@@ -1636,7 +1635,7 @@ public class TezCompiler extends PhyPlan
             }
         }
 
-        return getSamplingAggregationJobs(sort, rp, null, FindQuantiles.class.getName(), ctorArgs, 1);
+        return getSamplingAggregationJob(sort, rp, null, FindQuantiles.class.getName(), ctorArgs);
     }
 
     /**
@@ -1659,8 +1658,8 @@ public class TezCompiler extends PhyPlan
      * @throws VisitorException
      * @throws ExecException
      */
-    private Pair<TezOperator,Integer> getSamplingAggregationJobs(POSort sort, int rp,
-            List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs, int numLRs)
+    private Pair<TezOperator,Integer> getSamplingAggregationJob(POSort sort, int rp,
+            List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs)
                     throws PlanException, VisitorException, ExecException {
 
         TezOperator oper = getTezOp();
@@ -1791,26 +1790,9 @@ public class TezCompiler extends PhyPlan
         oper.plan.add(nfe3);
         oper.plan.connect(nfe2, nfe3);
 
-        if (numLRs > 1) {
-            // Skewed join broadcast sample map to multiple vertices, so we need
-            // to add POSplit to the plan and attach LRs to POSplit.
-            POSplit split = new POSplit(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            oper.setSplitOperatorKey(split.getOperatorKey());
-            oper.plan.add(split);
-            oper.plan.connect(nfe3, split);
-            splitsSeen.put(split.getOperatorKey(), oper);
-
-            for (int i = 0; i < numLRs; i++) {
-                POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
-                PhysicalPlan pp = new PhysicalPlan();
-                pp.add(lr);
-                split.addPlan(pp);
-            }
-        } else {
-            POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
-            oper.plan.add(lr);
-            oper.plan.connect(nfe3, lr);
-        }
+        POValueOutputTez sampleOut = new POValueOutputTez(OperatorKey.genOpKey(scope));
+        oper.plan.add(sampleOut);
+        oper.plan.connect(nfe3, sampleOut);
         oper.setClosed(true);
 
         oper.setRequestedParallelism(1);
@@ -1998,7 +1980,7 @@ public class TezCompiler extends PhyPlan
             // pigContext.defaultParallel to be taken into account
             int rp = Math.max(op.getRequestedParallelism(), 1);
 
-            Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
+            Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
             TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
 
             TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
@@ -2029,8 +2011,8 @@ public class TezCompiler extends PhyPlan
             edge.dataMovementType = DataMovementType.BROADCAST;
             edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
             edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-            lr = (POLocalRearrangeTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
-            lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
+            POValueOutputTez sampleOut = (POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
+            sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString());
             sortOpers[0].sampleOperator = quantJobParallelismPair.first;
 
             edge = TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld Mon Feb 17 08:02:44 2014
@@ -2,12 +2,12 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-36
+# TEZ DAG plan: scope-35
 #--------------------------------------------------
-Tez vertex scope-11	->	Tez vertex scope-29,Tez vertex scope-18,
-Tez vertex scope-18	->	Tez vertex scope-29,
-Tez vertex scope-29	->	Tez vertex scope-31,
-Tez vertex scope-31
+Tez vertex scope-11	->	Tez vertex scope-28,Tez vertex scope-18,
+Tez vertex scope-18	->	Tez vertex scope-28,
+Tez vertex scope-28	->	Tez vertex scope-30,
+Tez vertex scope-30
 
 Tez vertex scope-11
 # Plan on vertex
@@ -21,7 +21,7 @@ Local Rearrange[tuple]{tuple}(false) - s
         |   |
         |   Project[int][0] - scope-15
         |
-        |---b: Local Rearrange[tuple]{int}(false) - scope-12	->	 scope-29
+        |---b: Local Rearrange[tuple]{int}(false) - scope-12	->	 scope-28
             |   |
             |   Project[int][0] - scope-8
             |
@@ -38,9 +38,7 @@ Local Rearrange[tuple]{tuple}(false) - s
                 |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
 Tez vertex scope-18
 # Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-28	->	 scope-29
-|   |
-|   Constant(DummyVal) - scope-27
+POValueOutputTez - scope-27	->	 [scope-28]
 |
 |---New For Each(false)[tuple] - scope-26
     |   |
@@ -55,17 +53,17 @@ Local Rearrange[tuple]{bytearray}(false)
         |   Project[bag][1] - scope-20
         |
         |---Package(Packager)[tuple]{bytearray} - scope-19
-Tez vertex scope-29
+Tez vertex scope-28
 # Plan on vertex
-POIdentityInOutTez - scope-30	->	 scope-31
+POIdentityInOutTez - scope-29	->	 scope-30
 |   |
 |   Project[int][0] - scope-8
-Tez vertex scope-31
+Tez vertex scope-30
 # Plan on vertex
 b: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-10
 |
-|---New For Each(true)[tuple] - scope-34
+|---New For Each(true)[tuple] - scope-33
     |   |
-    |   Project[bag][1] - scope-33
+    |   Project[bag][1] - scope-32
     |
-    |---Package(LitePackager)[tuple]{int} - scope-32
\ No newline at end of file
+    |---Package(LitePackager)[tuple]{int} - scope-31
\ No newline at end of file

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld?rev=1568895&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld Mon Feb 17 08:02:44 2014
@@ -0,0 +1,101 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-56
+#--------------------------------------------------
+Tez vertex scope-27	->	Tez vertex scope-46,Tez vertex scope-36,
+Tez vertex scope-36	->	Tez vertex scope-46,Tez vertex scope-28,
+Tez vertex scope-46	->	Tez vertex scope-50,
+Tez vertex scope-28	->	Tez vertex scope-50,
+Tez vertex scope-50
+
+Tez vertex scope-27
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-31	->	 scope-36
+|   |
+|   Constant(DummyVal) - scope-30
+|
+|---New For Each(true,true)[tuple] - scope-35
+    |   |
+    |   Project[int][0] - scope-16
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-34
+    |   |
+    |   |---Project[tuple][*] - scope-33
+    |
+    |---PoissonSample - scope-32
+        |
+        |---Local Rearrange[tuple]{int}(false) - scope-29	->	 scope-46
+            |   |
+            |   Project[int][0] - scope-16
+            |
+            |---a: New For Each(false,false)[bag] - scope-7
+                |   |
+                |   Cast[int] - scope-2
+                |   |
+                |   |---Project[bytearray][0] - scope-1
+                |   |
+                |   Cast[int] - scope-5
+                |   |
+                |   |---Project[bytearray][1] - scope-4
+                |
+                |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-36
+# Plan on vertex
+POValueOutputTez - scope-45	->	 [scope-46, scope-28]
+|
+|---New For Each(false)[tuple] - scope-44
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-43
+    |   |
+    |   |---Project[tuple][*] - scope-42
+    |
+    |---New For Each(false,false)[tuple] - scope-41
+        |   |
+        |   Constant(1) - scope-40
+        |   |
+        |   Project[bag][1] - scope-38
+        |
+        |---Package(Packager)[tuple]{bytearray} - scope-37
+Tez vertex scope-46
+# Plan on vertex
+POIdentityInOutTez - scope-47	->	 scope-50
+|   |
+|   Project[int][0] - scope-16
+Tez vertex scope-28
+# Plan on vertex
+Partition Rearrange[tuple]{int}(false) - scope-48	->	 scope-50
+|   |
+|   Project[int][0] - scope-17
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-26
+|
+|---d: New For Each(false,false,false)[bag] - scope-25
+    |   |
+    |   Project[int][0] - scope-19
+    |   |
+    |   Project[int][1] - scope-21
+    |   |
+    |   Project[int][3] - scope-23
+    |
+    |---New For Each(true,true)[tuple] - scope-54
+        |   |
+        |   Project[bag][1] - scope-52
+        |   |
+        |   Project[bag][2] - scope-53
+        |
+        |---Package(Packager)[tuple]{int} - scope-51
\ No newline at end of file

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld Mon Feb 17 08:02:44 2014
@@ -2,17 +2,17 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-210
+# TEZ DAG plan: scope-209
 #--------------------------------------------------
-Tez vertex scope-106	->	Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-151,Tez vertex scope-140,Tez vertex scope-163,Tez vertex scope-188,
+Tez vertex scope-106	->	Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-150,Tez vertex scope-140,Tez vertex scope-162,Tez vertex scope-187,
 Tez vertex scope-122
-Tez vertex scope-140	->	Tez vertex scope-151,
-Tez vertex scope-151	->	Tez vertex scope-153,
-Tez vertex scope-153
+Tez vertex scope-162	->	Tez vertex scope-187,
+Tez vertex scope-187
 Tez vertex scope-126
+Tez vertex scope-140	->	Tez vertex scope-150,
+Tez vertex scope-150	->	Tez vertex scope-152,
+Tez vertex scope-152
 Tez vertex scope-109
-Tez vertex scope-163	->	Tez vertex scope-188,
-Tez vertex scope-188
 
 Tez vertex scope-106
 # Plan on vertex
@@ -40,7 +40,7 @@ Tez vertex scope-106
 |   |   |           |   |
 |   |   |           |   Project[int][0] - scope-137
 |   |   |           |
-|   |   |           |---e1: Local Rearrange[tuple]{int}(false) - scope-134	->	 scope-151
+|   |   |           |---e1: Local Rearrange[tuple]{int}(false) - scope-134	->	 scope-150
 |   |   |               |   |
 |   |   |               |   Project[int][0] - scope-88
 |   |   |               |
@@ -52,9 +52,9 @@ Tez vertex scope-106
 |   |   |                   |   |
 |   |   |                   |   |---Constant(3) - scope-86
 |   |   |   |
-|   |   |   f1: Local Rearrange[tuple]{tuple}(false) - scope-162	->	 scope-163
+|   |   |   f1: Local Rearrange[tuple]{tuple}(false) - scope-161	->	 scope-162
 |   |   |   |   |
-|   |   |   |   Project[tuple][*] - scope-161
+|   |   |   |   Project[tuple][*] - scope-160
 |   |   |   |
 |   |   |   |---f1: Limit - scope-95
 |   |   |       |
@@ -80,21 +80,21 @@ Tez vertex scope-106
 |   |   |   |
 |   |   |   Project[int][0] - scope-48
 |   |   |
-|   |   c2: Local Rearrange[tuple]{int}(false) - scope-202	->	 scope-126
+|   |   c2: Local Rearrange[tuple]{int}(false) - scope-201	->	 scope-126
 |   |   |   |
-|   |   |   Project[int][0] - scope-204
+|   |   |   Project[int][0] - scope-203
 |   |   |
-|   |   |---c3: New For Each(false,false)[bag] - scope-190
+|   |   |---c3: New For Each(false,false)[bag] - scope-189
 |   |       |   |
-|   |       |   Project[int][0] - scope-191
+|   |       |   Project[int][0] - scope-190
 |   |       |   |
-|   |       |   POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-192
+|   |       |   POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-191
 |   |       |   |
-|   |       |   |---Project[bag][0] - scope-193
+|   |       |   |---Project[bag][0] - scope-192
 |   |       |       |
-|   |       |       |---Project[bag][1] - scope-194
+|   |       |       |---Project[bag][1] - scope-193
 |   |       |
-|   |       |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-205
+|   |       |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-204
 |   |
 |   |---c: Filter[bag] - scope-34
 |       |   |
@@ -108,9 +108,9 @@ Tez vertex scope-106
 |   |   |
 |   |   d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-80
 |   |   |
-|   |   f2: Local Rearrange[tuple]{tuple}(false) - scope-185	->	 scope-188
+|   |   f2: Local Rearrange[tuple]{tuple}(false) - scope-184	->	 scope-187
 |   |   |   |
-|   |   |   Project[tuple][*] - scope-184
+|   |   |   Project[tuple][*] - scope-183
 |   |
 |   |---d1: Filter[bag] - scope-73
 |       |   |
@@ -150,54 +150,43 @@ c1: Store(file:///tmp/output/c1:org.apac
     |   Project[bag][2] - scope-52
     |
     |---c1: Package(Packager)[tuple]{int} - scope-46
-Tez vertex scope-140
+Tez vertex scope-162
 # Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-150	->	 scope-151
+f1: Split - scope-96
+|   |
+|   f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
 |   |
-|   Constant(DummyVal) - scope-149
+|   f2: Local Rearrange[tuple]{tuple}(false) - scope-186	->	 scope-187
+|   |   |
+|   |   Project[tuple][*] - scope-185
 |
-|---New For Each(false)[tuple] - scope-148
-    |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-147
-    |   |
-    |   |---Project[tuple][*] - scope-146
+|---f1: Limit - scope-166
     |
-    |---New For Each(false,false)[tuple] - scope-145
-        |   |
-        |   Constant(1) - scope-144
+    |---f1: New For Each(true)[bag] - scope-165
         |   |
-        |   Project[bag][1] - scope-142
+        |   Project[tuple][1] - scope-164
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-141
-Tez vertex scope-151
+        |---f1: Package(Packager)[tuple]{tuple} - scope-163
+Tez vertex scope-187
 # Plan on vertex
-POIdentityInOutTez - scope-152	->	 scope-153
-|   |
-|   Project[int][0] - scope-88
-Tez vertex scope-153
-# Plan on vertex
-e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
 |
-|---New For Each(true)[tuple] - scope-156
-    |   |
-    |   Project[bag][1] - scope-155
-    |
-    |---Package(LitePackager)[tuple]{int} - scope-154
+|---f2: Package(Packager)[tuple]{tuple} - scope-188
 Tez vertex scope-126
 # Combine plan on edge <scope-106>
-c2: Local Rearrange[tuple]{int}(false) - scope-206	->	 scope-126
+c2: Local Rearrange[tuple]{int}(false) - scope-205	->	 scope-126
 |   |
-|   Project[int][0] - scope-208
+|   Project[int][0] - scope-207
 |
-|---c3: New For Each(false,false)[bag] - scope-195
+|---c3: New For Each(false,false)[bag] - scope-194
     |   |
-    |   Project[int][0] - scope-196
+    |   Project[int][0] - scope-195
     |   |
-    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-197
+    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-196
     |   |
-    |   |---Project[bag][1] - scope-198
+    |   |---Project[bag][1] - scope-197
     |
-    |---c2: Package(CombinerPackager)[tuple]{int} - scope-201
+    |---c2: Package(CombinerPackager)[tuple]{int} - scope-200
 # Plan on vertex
 c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-68
 |
@@ -207,9 +196,40 @@ c3: Store(file:///tmp/output/c1:org.apac
     |   |
     |   POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-65
     |   |
-    |   |---Project[bag][1] - scope-199
+    |   |---Project[bag][1] - scope-198
     |
     |---c2: Package(CombinerPackager)[tuple]{int} - scope-58
+Tez vertex scope-140
+# Plan on vertex
+POValueOutputTez - scope-149	->	 [scope-150]
+|
+|---New For Each(false)[tuple] - scope-148
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-147
+    |   |
+    |   |---Project[tuple][*] - scope-146
+    |
+    |---New For Each(false,false)[tuple] - scope-145
+        |   |
+        |   Constant(1) - scope-144
+        |   |
+        |   Project[bag][1] - scope-142
+        |
+        |---Package(Packager)[tuple]{bytearray} - scope-141
+Tez vertex scope-150
+# Plan on vertex
+POIdentityInOutTez - scope-151	->	 scope-152
+|   |
+|   Project[int][0] - scope-88
+Tez vertex scope-152
+# Plan on vertex
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
+|
+|---New For Each(true)[tuple] - scope-155
+    |   |
+    |   Project[bag][1] - scope-154
+    |
+    |---Package(LitePackager)[tuple]{int} - scope-153
 Tez vertex scope-109
 # Plan on vertex
 b1: Split - scope-20
@@ -228,26 +248,4 @@ b1: Split - scope-20
 |       |       |
 |       |       |---Project[bag][1] - scope-28
 |
-|---b1: Package(Packager)[tuple]{int} - scope-17
-Tez vertex scope-163
-# Plan on vertex
-f1: Split - scope-96
-|   |
-|   f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
-|   |
-|   f2: Local Rearrange[tuple]{tuple}(false) - scope-187	->	 scope-188
-|   |   |
-|   |   Project[tuple][*] - scope-186
-|
-|---f1: Limit - scope-167
-    |
-    |---f1: New For Each(true)[bag] - scope-166
-        |   |
-        |   Project[tuple][1] - scope-165
-        |
-        |---f1: Package(Packager)[tuple]{tuple} - scope-164
-Tez vertex scope-188
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
-|
-|---f2: Package(Packager)[tuple]{tuple} - scope-189
\ No newline at end of file
+|---b1: Package(Packager)[tuple]{int} - scope-17
\ No newline at end of file

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Mon Feb 17 08:02:44 2014
@@ -106,6 +106,18 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testSkewedJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = join a by x, b by x using 'skewed';" +
+                "d = foreach c generate a::x as x, y, z;" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld");
+    }
+
+    @Test
     public void testLimit() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +