You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/01/16 03:25:35 UTC

svn commit: r1558674 [1/2] - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pi...

Author: cheolsoo
Date: Thu Jan 16 02:25:35 2014
New Revision: 1558674

URL: http://svn.apache.org/r1558674
Log:
PIG-3644: Implement skewed join in Tez (cheolsoo)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
Modified:
    pig/branches/tez/src/org/apache/pig/PigConfiguration.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
    pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    pig/branches/tez/test/e2e/pig/tests/tez.conf

Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Thu Jan 16 02:25:35 2014
@@ -149,6 +149,17 @@ public class PigConfiguration {
     public static final String PIG_DELETE_TEMP_FILE = "pig.delete.temp.files";
 
     /**
+     * For a given mean and a confidence, a sample rate is obtained from a poisson udf
+     */
+    public static final String SAMPLE_RATE = "pig.sksampler.samplerate";
+
+    /**
+     * % of memory available for the input data. This is currently equal to the
+     * memory available for the skewed join
+     */
+    public static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
+
+    /**
       * This key used to control the maximum size loaded into
       * the distributed cache when doing fragment-replicated join
       */

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Thu Jan 16 02:25:35 2014
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
 
-import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configurable;
@@ -36,26 +35,35 @@ import org.apache.pig.impl.io.NullableTu
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.Pair;
 
+import com.google.common.collect.Maps;
+
 
 /**
   * This class is used by skewed join. For the partitioned table, the skewedpartitioner reads the key
   * distribution data from the sampler file and returns the reducer index in a round robin fashion.
-  * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned 
+  * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned
   * in a round robin manner.
-  */ 
+  */
 public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
-    Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
-    static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
-    Integer totalReducers;
-    Configuration conf;
+
+    protected Map<Tuple, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+    protected Integer totalReducers;
+    protected boolean inited = false;
+
+    private static Map<Tuple, Integer> currentIndexMap = Maps.newHashMap();
+    private Configuration conf;
 
     @Override
-    public int getPartition(PigNullableWritable wrappedKey, Writable value,
-            int numPartitions) {
-		// for streaming tables, return the partition index blindly
-		if (wrappedKey instanceof NullablePartitionWritable && (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
-			return ((NullablePartitionWritable)wrappedKey).getPartition();
-		}
+    public int getPartition(PigNullableWritable wrappedKey, Writable value, int numPartitions) {
+        if (!inited) {
+            init();
+        }
+
+        // for streaming tables, return the partition index blindly
+        if (wrappedKey instanceof NullablePartitionWritable &&
+                (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+            return ((NullablePartitionWritable)wrappedKey).getPartition();
+        }
 
         // for partition table, compute the index based on the sampler output
         Pair <Integer, Integer> indexes;
@@ -78,7 +86,7 @@ public class SkewedPartitioner extends P
 
         // if the partition file is empty, use numPartitions
         totalReducers = (totalReducers > 0) ? totalReducers : numPartitions;
-        
+
         indexes = reducerMap.get(keyTuple);
         // if the reducerMap does not contain the key, do the default hash based partitioning
         if (indexes == null) {
@@ -105,24 +113,28 @@ public class SkewedPartitioner extends P
         conf = job;
         PigMapReduce.sJobConfInternal.set(conf);
         PigMapReduce.sJobConf = conf;
-        String keyDistFile = job.get("pig.keyDistFile", "");
-        if (keyDistFile.length() == 0)
-            throw new RuntimeException(this.getClass().getSimpleName() + " used but no key distribution found");
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    protected void init() {
+        String keyDistFile = conf.get("pig.keyDistFile", "");
+        if (keyDistFile.length() == 0) {
+            throw new RuntimeException(this.getClass().getSimpleName() +
+                    " used but no key distribution found");
+        }
 
         try {
             Integer [] redCnt = new Integer[1]; 
             reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
-                    keyDistFile, redCnt, DataType.TUPLE, job);
+                    keyDistFile, redCnt, DataType.TUPLE, conf);
             // check if the partition file is empty
             totalReducers = (redCnt[0] == null) ? -1 : redCnt[0];
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
 }

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1558674&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Thu Jan 16 02:25:35 2014
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POPoissonSample extends PhysicalOperator {
+
+    private static final long serialVersionUID = 1L;
+
+    // marker string to mark the last sample row, which has total number or rows
+    // seen by this map instance. this string will be in the 2nd last column of
+    // the last sample row it is used by GetMemNumRows.
+    public static final String NUMROWS_TUPLE_MARKER =
+        "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
+
+    private static final TupleFactory tf = TupleFactory.getInstance();
+    private static Result eop = new Result(POStatus.STATUS_EOP, null);
+
+    // num of rows sampled so far
+    private int numRowsSampled = 0;
+
+    // average size of tuple in memory, for tuples sampled
+    private long avgTupleMemSz = 0;
+
+    // current row number
+    private long rowNum = 0;
+
+    // number of tuples to skip after each sample
+    private long skipInterval = -1;
+
+    // bytes in input to skip after every sample.
+    // divide this by avgTupleMemSize to get skipInterval
+    private long memToSkipPerSample = 0;
+
+    // has the special row with row number information been returned
+    private boolean numRowSplTupleReturned = false;
+
+    // 17 is not a magic number. It can be obtained by using a poisson
+    // cumulative distribution function with the mean set to 10 (empirically,
+    // minimum number of samples) and the confidence set to 95%
+    public static final int DEFAULT_SAMPLE_RATE = 17;
+
+    private int sampleRate = 0;
+
+    private float heapPerc = 0f;
+
+    // new Sample result
+    private Result newSample = null;
+
+    public POPoissonSample(OperatorKey k, int rp, int sr, float hp) {
+        super(k, rp, null);
+        numRowsSampled = 0;
+        avgTupleMemSz = 0;
+        rowNum = 0;
+        skipInterval = -1;
+        memToSkipPerSample = 0;
+        numRowSplTupleReturned = false;
+        newSample = null;
+        sampleRate = sr;
+        heapPerc = hp;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        if (numRowSplTupleReturned) {
+            // row num special row has been returned after all inputs
+            // were read, nothing more to read
+            return eop;
+        }
+
+        Result res = null;
+        if (skipInterval == -1) {
+            // select first tuple as sample and calculate
+            // number of tuples to be skipped
+            while (true) {
+                res = processInput();
+                if (res.returnStatus == POStatus.STATUS_NULL) {
+                    continue;
+                } else if (res.returnStatus == POStatus.STATUS_EOP
+                        || res.returnStatus == POStatus.STATUS_ERR) {
+                    return res;
+                }
+
+                if (res.result == null) {
+                    return createNumRowTuple(null);
+                }
+                long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+                memToSkipPerSample = availRedMem/sampleRate;
+                updateSkipInterval((Tuple)res.result);
+
+                rowNum++;
+                newSample = res;
+                break;
+            }
+        }
+
+        // skip tuples
+        for (long numSkipped  = 0; numSkipped < skipInterval; numSkipped++) {
+            res = processInput();
+            if (res.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            } else if (res.returnStatus == POStatus.STATUS_EOP
+                    || res.returnStatus == POStatus.STATUS_ERR) {
+                return createNumRowTuple((Tuple)newSample.result);
+            }
+            rowNum++;
+        }
+
+        // skipped enough, get new sample
+        while (true) {
+            res = processInput();
+            if (res.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            } else if (res.returnStatus == POStatus.STATUS_EOP
+                    || res.returnStatus == POStatus.STATUS_ERR) {
+                return createNumRowTuple((Tuple)newSample.result);
+            }
+
+            if (res.result == null) {
+                return createNumRowTuple((Tuple)newSample.result);
+            }
+            updateSkipInterval((Tuple)res.result);
+            Result currentSample = newSample;
+
+            rowNum++;
+            newSample = res;
+            return currentSample;
+        }
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "PoissonSample - " + mKey.toString();
+    }
+
+    /**
+     * Update the average tuple size base on newly sampled tuple t
+     * and recalculate skipInterval
+     * @param t - tuple
+     */
+    private void updateSkipInterval(Tuple t) {
+        avgTupleMemSz =
+            ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
+        skipInterval = memToSkipPerSample/avgTupleMemSz;
+
+        // skipping fewer number of rows the first few times, to reduce the
+        // probability of first tuples size (if much smaller than rest)
+        // resulting in very few samples being sampled. Sampling a little extra
+        // is OK
+        if(numRowsSampled < 5) {
+            skipInterval = skipInterval/(10-numRowsSampled);
+        }
+        ++numRowsSampled;
+    }
+
+    /**
+     * @param sample - sample tuple
+     * @return - Tuple appended with special marker string column, num-rows column
+     * @throws ExecException
+     */
+    private Result createNumRowTuple(Tuple sample) throws ExecException {
+        int sz = (sample == null) ? 0 : sample.size();
+        Tuple t = tf.newTuple(sz + 2);
+
+        if (sample != null) {
+            for (int i=0; i<sample.size(); i++){
+                t.set(i, sample.get(i));
+            }
+        }
+
+        t.set(sz, NUMROWS_TUPLE_MARKER);
+        t.set(sz + 1, rowNum);
+        numRowSplTupleReturned = true;
+        return new Result(POStatus.STATUS_OK, t);
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Thu Jan 16 02:25:35 2014
@@ -90,8 +90,7 @@ public class POReservoirSample extends P
                 rowProcessed++;
             } else if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
-            }
-            else {
+            } else {
                 break;
             }
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Thu Jan 16 02:25:35 2014
@@ -28,6 +28,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -44,12 +45,14 @@ import org.apache.tez.runtime.library.ou
 public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput {
 
     private static final long serialVersionUID = 1L;
-    private static Result empty = new Result(POStatus.STATUS_NULL, null);
+    protected static Result empty = new Result(POStatus.STATUS_NULL, null);
+
+    protected String outputKey;
+    protected transient KeyValueWriter writer;
 
-    private String outputKey;
-    private transient KeyValueWriter writer;
     // Tez union is implemented as LR + Pkg
     private boolean isUnion = false;
+    private boolean isSkewedJoin = false;
 
     public POLocalRearrangeTez(OperatorKey k) {
         super(k);
@@ -79,6 +82,14 @@ public class POLocalRearrangeTez extends
         this.isUnion = isUnion;
     }
 
+    public boolean isSkewedJoin() {
+        return isSkewedJoin;
+    }
+
+    public void setSkewedJoin(boolean isSkewedJoin) {
+        this.isSkewedJoin = isSkewedJoin;
+    }
+
     @Override
     public void attachOutputs(Map<String, LogicalOutput> outputs,
             Configuration conf) throws ExecException {
@@ -124,10 +135,25 @@ public class POLocalRearrangeTez extends
                     Byte index = (Byte)result.get(0);
                     PigNullableWritable key = null;
                     NullableTuple val = null;
-                    if (isUnion()) {
+                    if (isUnion) {
                         // Use the entire tuple as both key and value
                         key = HDataType.getWritableComparableTypes(result.get(1), keyType);
                         val = new NullableTuple((Tuple)result.get(1));
+                    } else if (isSkewedJoin) {
+                        // Skewed join uses NullablePartitionWritable as key
+                        Byte tupleKeyIdx = 2;
+                        Byte tupleValIdx = 3;
+
+                        Integer partitionIndex = -1;
+                        tupleKeyIdx--;
+                        tupleValIdx--;
+
+                        key = HDataType.getWritableComparableTypes(result.get(tupleKeyIdx), keyType);
+                        NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
+                        wrappedKey.setIndex(index);
+                        wrappedKey.setPartition(partitionIndex);
+                        key = wrappedKey;
+                        val = new NullableTuple((Tuple)result.get(tupleValIdx));
                     } else {
                         key = HDataType.getWritableComparableTypes(result.get(1), keyType);
                         val = new NullableTuple((Tuple)result.get(2));

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java?rev=1558674&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java Thu Jan 16 02:25:35 2014
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.io.NullablePartitionWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.Pair;
+import org.python.google.common.collect.Lists;
+
+import com.google.common.collect.Maps;
+
+/**
+ * The partition rearrange operator is a part of the skewed join implementation.
+ * It has an embedded physical plan that generates tuples of the form
+ * (inpKey,reducerIndex,(indxed inp Tuple)).
+ */
+public class POPartitionRearrangeTez extends POLocalRearrangeTez {
+    private static final long serialVersionUID = 1L;
+    private static final TupleFactory tf = TupleFactory.getInstance();
+    private static final BagFactory mBagFactory = BagFactory.getInstance();
+
+    // ReducerMap will store the tuple, max reducer index & min reducer index
+    private Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+    private Integer totalReducers = -1;
+    private boolean inited;
+
+    public POPartitionRearrangeTez(OperatorKey k, int rp) {
+        super(k, rp);
+        index = -1;
+        leafOps = Lists.newArrayList();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "Partition Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t->\t " + outputKey;
+    }
+
+    /**
+     * Calls getNext on the generate operator inside the nested physical plan.
+     * Converts the generated tuple into the proper format, i.e,
+     * (key,indexedTuple(value))
+     */
+    @Override
+    public Result getNextTuple() throws ExecException {
+        if (!inited) {
+            init();
+        }
+
+        while (true) {
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
+                break;
+            }
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            }
+
+            for (PhysicalPlan ep : plans) {
+                ep.attachInput((Tuple)inp.result);
+            }
+
+            Result res = null;
+            List<Result> resLst = new ArrayList<Result>();
+            for (ExpressionOperator op : leafOps){
+                res = op.getNext(op.getResultType());
+                if (res.returnStatus != POStatus.STATUS_OK) {
+                    return new Result();
+                }
+                resLst.add(res);
+            }
+            res.result = constructPROutput(resLst,(Tuple)inp.result);
+            if (writer == null) { // In the case of combiner
+                return res;
+            }
+
+            Iterator<Tuple> its = ((DataBag)res.result).iterator();
+            while(its.hasNext()) {
+                Tuple result = its.next();
+                Byte tupleKeyIdx = 2;
+                Byte tupleValIdx = 3;
+
+                Integer partitionIndex = -1;
+                partitionIndex = (Integer)result.get(1);
+
+                PigNullableWritable key =
+                        HDataType.getWritableComparableTypes(result.get(tupleKeyIdx), keyType);
+                NullableTuple val = new NullableTuple((Tuple)result.get(tupleValIdx));
+
+                NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
+                wrappedKey.setIndex(index);
+                wrappedKey.setPartition(partitionIndex);
+                val.setIndex(index);
+
+                try {
+                    writer.write(wrappedKey, val);
+                } catch (IOException ioe) {
+                    int errCode = 2135;
+                    String msg = "Received error from POPartitionRearrage function." + ioe.getMessage();
+                    throw new ExecException(msg, errCode, ioe);
+                }
+            }
+
+            res = empty;
+        }
+        return inp;
+    }
+
+    // Returns bag of tuples
+    protected DataBag constructPROutput(List<Result> resLst, Tuple value) throws ExecException{
+        Tuple t = super.constructLROutput(resLst, null, value);
+
+        //Construct key
+        Object key = t.get(1);
+
+        // Construct an output bag and feed in the tuples
+        DataBag opBag = mBagFactory.newDefaultBag();
+
+        // Put the index, key, and value in a tuple and return
+        // first -> min, second -> max
+        Pair <Integer, Integer> indexes = reducerMap.get(key);
+
+        // For non skewed keys, we set the partition index to be -1
+        if (indexes == null) {
+            indexes = new Pair <Integer, Integer>(-1,0);
+        }
+
+        for (Integer reducerIdx=indexes.first, cnt=0; cnt <= indexes.second; reducerIdx++, cnt++) {
+            if (reducerIdx >= totalReducers) {
+                reducerIdx = 0;
+            }
+            Tuple opTuple = mTupleFactory.newTuple(4);
+            opTuple.set(0, t.get(0));
+            // set the partition index
+            opTuple.set(1, reducerIdx.intValue());
+            opTuple.set(2, key);
+            opTuple.set(3, t.get(2));
+
+            opBag.add(opTuple);
+        }
+
+        return opBag;
+    }
+
+    private void init() throws RuntimeException {
+        Map<String, Object> distMap = null;
+        if (PigProcessor.sampleMap != null) {
+            // We've already collected sampleMap in PigProcessor
+            distMap = PigProcessor.sampleMap;
+        } else {
+            throw new RuntimeException(this.getClass().getSimpleName() +
+                    " used but no key distribution found");
+        }
+
+        try {
+            if (distMap != null) {
+                Integer[] totalReducers = new Integer[1];
+
+                // The distMap is structured as (key, min, max) where min, max
+                // being the index of the reducers
+                DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+                totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+                Iterator<Tuple> it = partitionList.iterator();
+                while (it.hasNext()) {
+                    Tuple idxTuple = it.next();
+                    Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+                    Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+                    // Used to replace the maxIndex with the number of reducers
+                    if (maxIndex < minIndex) {
+                        maxIndex = totalReducers[0] + maxIndex;
+                    }
+
+                    Tuple keyT;
+                    // if the join is on more than 1 key
+                    if (idxTuple.size() > 3) {
+                        // remove the last 2 fields of the tuple, i.e: minIndex
+                        // and maxIndex and store it in the reducer map
+                        Tuple keyTuple = tf.newTuple();
+                        for (int i=0; i < idxTuple.size() - 2; i++) {
+                            keyTuple.append(idxTuple.get(i));
+                        }
+                        keyT = keyTuple;
+                    } else {
+                        keyT = tf.newTuple(1);
+                        keyT.set(0,idxTuple.get(0));
+                    }
+                    // number of reducers
+                    Integer cnt = maxIndex - minIndex;
+                    // 1 is added to account for the 0 index
+                    reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        inited = true;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Thu Jan 16 02:25:35 2014
@@ -53,6 +53,7 @@ public class POShuffleTezLoad extends PO
     private boolean[] readOnce;
 
     private WritableComparator comparator = null;
+    private boolean isSkewedJoin = false;
 
     public POShuffleTezLoad(POPackage pack) {
         super(pack);
@@ -63,7 +64,7 @@ public class POShuffleTezLoad extends PO
             throws ExecException {
         try {
             comparator = ReflectionUtils.newInstance(
-                    TezDagBuilder.comparatorForKeyType(pkgr.getKeyType()), conf);
+                    TezDagBuilder.comparatorForKeyType(pkgr.getKeyType(), isSkewedJoin), conf);
         } catch (JobCreationException e) {
             throw new ExecException(e);
         }
@@ -195,6 +196,14 @@ public class POShuffleTezLoad extends PO
         inputKeys.add(inputKey);
     }
 
+    public void setSkewedJoins(boolean isSkewedJoin) {
+        this.isSkewedJoin = isSkewedJoin;
+    }
+
+    public boolean isSkewedJoin() {
+        return isSkewedJoin;
+    }
+
     @Override
     public boolean supportsMultipleInputs() {
         return true;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Thu Jan 16 02:25:35 2014
@@ -63,8 +63,8 @@ public class PigProcessor implements Log
     private PhysicalOperator leaf;
 
     private Configuration conf;
-    
-    public static Map<String, Object> quantileMap = null;
+
+    public static Map<String, Object> sampleMap = null;
 
     @Override
     public void initialize(TezProcessorContext processorContext)
@@ -98,15 +98,16 @@ public class PigProcessor implements Log
 
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public void run(Map<String, LogicalInput> inputs,
             Map<String, LogicalOutput> outputs) throws Exception {
         initializeInputs(inputs);
 
         initializeOutputs(outputs);
-        
+
         if (conf.get("pig.sampleVertex") != null) {
-            collectQuantile((BroadcastKVReader)inputs.get(conf.get("pig.sampleVertex")).getReader());
+            collectSample((BroadcastKVReader)inputs.get(conf.get("pig.sampleVertex")).getReader());
         }
 
         List<PhysicalOperator> leaves = null;
@@ -198,16 +199,14 @@ public class PigProcessor implements Log
             }
         }
     }
-    
-    private void collectQuantile(BroadcastKVReader reader) throws IOException {
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void collectSample(BroadcastKVReader reader) throws IOException {
         reader.next();
         Object val = reader.getCurrentValue();
         NullableTuple nTup = (NullableTuple) val;
         Tuple t = (Tuple) nTup.getValueAsPigType();
-        // the Quantiles file has a tuple as under:
-        // (numQuantiles, bag of samples)
-        // numQuantiles here is the reduce parallelism
-        quantileMap = (Map<String, Object>) t.get(0);
+        sampleMap = (Map<String, Object>) t.get(0);
     }
 
 }

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1558674&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Thu Jan 16 02:25:35 2014
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.util.Pair;
+
+public class SkewedPartitionerTez extends SkewedPartitioner {
+    private static final TupleFactory tf = TupleFactory.getInstance();
+
+    @Override
+    protected void init() {
+        Map<String, Object> distMap = null;
+        if (PigProcessor.sampleMap != null) {
+            // We've collected sampleMap in PigProcessor
+            distMap = PigProcessor.sampleMap;
+        } else {
+            throw new RuntimeException(this.getClass().getSimpleName() +
+                    " used but no key distribution found");
+        }
+
+        try {
+            if (distMap != null) {
+                Integer[] totalReducers = new Integer[1];
+
+                // The distMap is structured as (key, min, max) where min, max
+                // being the index of the reducers
+                DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+                totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+                Iterator<Tuple> it = partitionList.iterator();
+                while (it.hasNext()) {
+                    Tuple idxTuple = it.next();
+                    Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+                    Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+                    // Used to replace the maxIndex with the number of reducers
+                    if (maxIndex < minIndex) {
+                        maxIndex = totalReducers[0] + maxIndex;
+                    }
+
+                    Tuple keyT;
+                    // if the join is on more than 1 key
+                    if (idxTuple.size() > 3) {
+                        // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+                        // it in the reducer map
+                        Tuple keyTuple = tf.newTuple();
+                        for (int i=0; i < idxTuple.size() - 2; i++) {
+                            keyTuple.append(idxTuple.get(i));
+                        }
+                        keyT = keyTuple;
+                    } else {
+                        keyT = tf.newTuple(1);
+                        keyT.set(0,idxTuple.get(0));
+                    }
+                    // number of reducers
+                    Integer cnt = maxIndex - minIndex;
+                    // 1 is added to account for the 0 index
+                    reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        inited = true;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Jan 16 02:25:35 2014
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
 
@@ -34,6 +35,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -63,6 +65,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
@@ -78,6 +81,8 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -88,6 +93,8 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.CompilerUtils;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -107,6 +114,7 @@ public class TezCompiler extends PhyPlan
     private static final Log LOG = LogFactory.getLog(TezCompiler.class);
 
     private PigContext pigContext;
+    private Properties pigProperties;
 
     //The plan that is being compiled
     private PhysicalPlan plan;
@@ -147,6 +155,7 @@ public class TezCompiler extends PhyPlan
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         this.plan = plan;
         this.pigContext = pigContext;
+        pigProperties = pigContext.getProperties();
         splitsSeen = Maps.newHashMap();
         tezPlan = new TezOperPlan();
         nig = NodeIdGenerator.getGenerator();
@@ -161,9 +170,9 @@ public class TezCompiler extends PhyPlan
         localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
         phyToTezOpMap = Maps.newHashMap();
 
-        fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties()
+        fileConcatenationThreshold = Integer.parseInt(pigProperties
                 .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
-        optimisticFileConcatenation = pigContext.getProperties().getProperty(
+        optimisticFileConcatenation = pigProperties.getProperty(
                 OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
         LOG.info("File concatenation threshold: " + fileConcatenationThreshold
                 + " optimistic? " + optimisticFileConcatenation);
@@ -1170,9 +1179,249 @@ public class TezCompiler extends PhyPlan
 
     @Override
     public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
-        int errCode = 2034;
-        String msg = "Cannot compile " + op.getClass().getSimpleName();
-        throw new TezCompilerException(msg, errCode, PigException.BUG);
+        try {
+            // LR that transfers loaded input to partition vertex
+            POLocalRearrangeTez lrTez = localRearrangeFactory.create(LocalRearrangeType.NULL);
+            // LR that broadcasts sampled input to sampling aggregation vertex
+            POLocalRearrangeTez lrTezSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
+
+            int sampleRate = POPoissonSample.DEFAULT_SAMPLE_RATE;
+            if (pigProperties.containsKey(PigConfiguration.SAMPLE_RATE)) {
+                sampleRate = Integer.valueOf(pigProperties.getProperty(PigConfiguration.SAMPLE_RATE));
+            }
+            float heapPerc =  PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+            if (pigProperties.containsKey(PigConfiguration.PERC_MEM_AVAIL)) {
+                heapPerc = Float.valueOf(pigProperties.getProperty(PigConfiguration.PERC_MEM_AVAIL));
+            }
+            POPoissonSample poSample = new POPoissonSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
+                    -1, sampleRate, heapPerc);
+
+            TezOperator prevOp = compiledInputs[0];
+            prevOp.plan.addAsLeaf(lrTez);
+            prevOp.plan.addAsLeaf(poSample);
+
+            MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
+            List<PhysicalOperator> l = plan.getPredecessors(op);
+            List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+            List<Boolean> ascCol = new ArrayList<Boolean>();
+            for (int i=0; i< groups.size(); i++) {
+                ascCol.add(false);
+            }
+
+            // Set up transform plan to get keys and memory size of input
+            // tuples. It first adds all the plans to get key columns.
+            List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+            transformPlans.addAll(groups);
+
+            // Then it adds a column for memory size
+            POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            prjStar.setResultType(DataType.TUPLE);
+            prjStar.setStar(true);
+
+            List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+            ufInps.add(prjStar);
+
+            PhysicalPlan ep = new PhysicalPlan();
+            POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)),
+                    -1, ufInps, new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
+            uf.setResultType(DataType.TUPLE);
+            ep.add(uf);
+            ep.add(prjStar);
+            ep.connect(prjStar, uf);
+
+            transformPlans.add(ep);
+
+            List<Boolean> flat1 = new ArrayList<Boolean>();
+            List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+            for (int i=0; i<transformPlans.size(); i++) {
+                eps1.add(transformPlans.get(i));
+                flat1.add(true);
+            }
+
+            // This foreach will pick the sort key columns from the POPoissonSample output
+            POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
+                    -1, eps1, flat1);
+            prevOp.plan.addAsLeaf(nfe1);
+            prevOp.plan.addAsLeaf(lrTezSample);
+            prevOp.setClosed(true);
+
+            POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(),
+                    null, groups, ascCol, null);
+            String per = pigProperties.getProperty("pig.skewedjoin.reduce.memusage",
+                    String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
+            String mc = pigProperties.getProperty("pig.skewedjoin.reduce.maxtuple", "0");
+
+            int rp = Math.max(op.getRequestedParallelism(), 1);
+            Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJobs(sort, rp, null,
+                    PartitionSkewedKeys.class.getName(), new String[]{per, mc}, 2);
+            rp = sampleJobPair.second;
+
+            // Set parallelism of SkewedJoin as the value calculated by sampling
+            // job if "parallel" is specified in join statement, "rp" is equal
+            // to that number if not specified, use the value that sampling
+            // process calculated based on default.
+            op.setRequestedParallelism(rp);
+
+            TezOperator[] joinJobs = new TezOperator[] {null, compiledInputs[1], null};
+            TezOperator[] joinInputs = new TezOperator[] {compiledInputs[0], compiledInputs[1]};
+            TezOperator[] rearrangeOutputs = new TezOperator[2];
+
+            compiledInputs = new TezOperator[] {joinInputs[0]};
+
+            blocking();
+
+            // Then add a POPackage and a POForEach to the start of the new tezOp.
+            POPackage pkg = getPackage(1, DataType.BYTEARRAY);
+            curTezOp.plan.add(pkg);
+
+            POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+            project.setResultType(DataType.BAG);
+            project.setStar(false);
+            project.setColumn(1);
+            POForEach forEach =
+                    TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
+            curTezOp.plan.addAsLeaf(forEach);
+            joinJobs[0] = curTezOp;
+
+            // Run POLocalRearrange for first join table
+            POLocalRearrangeTez lr =
+                    new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+            try {
+                lr.setIndex(0);
+            } catch (ExecException e) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly created POLocalRearrange.";
+                throw new PlanException(msg, errCode, PigException.BUG, e);
+            }
+
+            // Check the type of group keys, if there are more than one field, the key is TUPLE.
+            byte type = DataType.TUPLE;
+            if (groups.size() == 1) {
+                type = groups.get(0).getLeaves().get(0).getResultType();
+            }
+
+            // Run POLocalRearrange for first join table
+            lr.setKeyType(type);
+            lr.setPlans(groups);
+            lr.setSkewedJoin(true);
+            lr.setResultType(DataType.TUPLE);
+            joinJobs[0].plan.addAsLeaf(lr);
+            joinJobs[0].setClosed(true);
+            if (lr.getRequestedParallelism() > joinJobs[0].requestedParallelism) {
+                joinJobs[0].requestedParallelism = lr.getRequestedParallelism();
+            }
+            rearrangeOutputs[0] = joinJobs[0];
+
+            compiledInputs = new TezOperator[] {joinInputs[1]};
+
+            // Run POPartitionRearrange for second join table
+            POPartitionRearrangeTez pr =
+                    new POPartitionRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)), rp);
+            try {
+                pr.setIndex(1);
+            } catch (ExecException e) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly created POPartitionRearrange.";
+                throw new PlanException(msg, errCode, PigException.BUG, e);
+            }
+
+            groups = joinPlans.get(l.get(1));
+            pr.setPlans(groups);
+            pr.setKeyType(type);
+            pr.setSkewedJoin(true);
+            pr.setResultType(DataType.BAG);
+            joinJobs[1].plan.addAsLeaf(pr);
+            joinJobs[1].setClosed(true);
+            if (pr.getRequestedParallelism() > joinJobs[1].requestedParallelism) {
+                joinJobs[1].requestedParallelism = pr.getRequestedParallelism();
+            }
+            rearrangeOutputs[1] = joinJobs[1];
+
+            compiledInputs = rearrangeOutputs;
+
+            // Create POGlobalRearrange
+            POGlobalRearrange gr =
+                    new POGlobalRearrange(new OperatorKey(scope, nig.getNextNodeId(scope)), rp);
+            // Skewed join has its own special partitioner
+            gr.setResultType(DataType.TUPLE);
+            gr.visit(this);
+            joinJobs[2] = curTezOp;
+            if (gr.getRequestedParallelism() > joinJobs[2].requestedParallelism) {
+                joinJobs[2].requestedParallelism = gr.getRequestedParallelism();
+            }
+
+            compiledInputs = new TezOperator[] {joinJobs[2]};
+
+            // Create POPakcage
+            pkg = getPackage(2, DataType.TUPLE);
+            boolean [] inner = op.getInnerFlags();
+            pkg.getPkgr().setInner(inner);
+            pkg.visit(this);
+
+            // Create POForEach
+            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+            List<Boolean> flat = new ArrayList<Boolean>();
+
+            // Add corresponding POProjects
+            for (int i=0; i < 2; i++) {
+                ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+                prj.setColumn(i+1);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.BAG);
+                ep.add(prj);
+                eps.add(ep);
+                if (!inner[i]) {
+                    // Add an empty bag for outer join
+                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+                }
+                flat.add(true);
+            }
+
+            POForEach fe =
+                    new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, eps, flat);
+            fe.setResultType(DataType.TUPLE);
+            fe.visit(this);
+
+            // Connect vertices
+            lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
+            lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
+
+            TezEdgeDescriptor edge = joinJobs[0].inEdges.get(prevOp.getOperatorKey());
+            // TODO: Convert to unsorted shuffle after TEZ-661
+            // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+            // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            edge.partitionerClass = RoundRobinPartitioner.class;
+
+            TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
+
+            POSplit split = (POSplit) sampleJobPair.first.plan.getLeaves().get(0);
+            List<PhysicalPlan> pp = split.getPlans();
+            for (int i = 0; i < pp.size(); i++) {
+                TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
+
+                // Configure broadcast edges for distribution map
+                edge = joinJobs[i].inEdges.get(sampleJobPair.first.getOperatorKey());
+                edge.dataMovementType = DataMovementType.BROADCAST;
+                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                lrTez = (POLocalRearrangeTez) pp.get(i).getLeaves().get(0);
+                lrTez.setOutputKey(joinJobs[i].getOperatorKey().toString());
+                joinJobs[i].sampleOperator = sampleJobPair.first;
+
+                // Configure skewed partitioner for join
+                edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
+                edge.partitionerClass = SkewedPartitionerTez.class;
+            }
+            joinJobs[2].setSkewedJoin(true);
+
+            phyToTezOpMap.put(op, curTezOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+        }
     }
 
     /**
@@ -1190,7 +1439,8 @@ public class TezCompiler extends PhyPlan
      * @return Tez operator that now is finished with a store.
      * @throws PlanException
      */
-    private TezOperator endSingleInputWithStoreAndSample(POSort sort, POLocalRearrangeTez lr, POLocalRearrangeTez lrSample) throws PlanException{
+    private TezOperator endSingleInputWithStoreAndSample(POSort sort, POLocalRearrangeTez lr,
+            POLocalRearrangeTez lrSample) throws PlanException {
         if(compiledInputs.length>1) {
             int errCode = 2023;
             String msg = "Received a multi input plan when expecting only a single input one.";
@@ -1204,7 +1454,6 @@ public class TezCompiler extends PhyPlan
             List<Boolean> flat1 = new ArrayList<Boolean>();
             List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
 
-
             Pair<POProject, Byte>[] sortProjs = null;
             try{
                 sortProjs = getSortCols(sort.getSortPlans());
@@ -1259,7 +1508,7 @@ public class TezCompiler extends PhyPlan
             oper.plan.addAsLeaf(lrSample);
         } else {
             int errCode = 2022;
-            String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+            String msg = "The current operator is closed. This is unexpected while compiling.";
             throw new PlanException(msg, errCode, PigException.BUG);
         }
         return oper;
@@ -1292,13 +1541,12 @@ public class TezCompiler extends PhyPlan
             }
         }
 
-        return getSamplingAggregationJobs(sort, rp, null, FindQuantiles.class.getName(), ctorArgs);
+        return getSamplingAggregationJobs(sort, rp, null, FindQuantiles.class.getName(), ctorArgs, 1);
     }
 
     /**
      * Create a sampling job to collect statistics by sampling input data. The
      * sequence of operations is as following:
-     * <li>Transform input sample tuples into another tuple.</li>
      * <li>Add an extra field &quot;all&quot; into the tuple </li>
      * <li>Package all tuples into one bag </li>
      * <li>Add constant field for number of reducers. </li>
@@ -1317,7 +1565,7 @@ public class TezCompiler extends PhyPlan
      * @throws ExecException
      */
     private Pair<TezOperator,Integer> getSamplingAggregationJobs(POSort sort, int rp,
-            List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs )
+            List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs, int numLRs)
                     throws PlanException, VisitorException, ExecException {
 
         TezOperator oper = getTezOp();
@@ -1448,12 +1696,28 @@ public class TezCompiler extends PhyPlan
         oper.plan.add(nfe3);
         oper.plan.connect(nfe2, nfe3);
 
-        POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
-
-        oper.plan.add(lr);
-        oper.plan.connect(nfe3, lr);
-
+        if (numLRs > 1) {
+            // Skewed join broadcast sample map to multiple vertices, so we need
+            // to add POSplit to the plan and attach LRs to POSplit.
+            POSplit split = new POSplit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            oper.setSplitOperatorKey(split.getOperatorKey());
+            oper.plan.add(split);
+            oper.plan.connect(nfe3, split);
+            splitsSeen.put(split.getOperatorKey(), oper);
+
+            for (int i = 0; i < numLRs; i++) {
+                POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
+                PhysicalPlan pp = new PhysicalPlan();
+                pp.add(lr);
+                split.addPlan(pp);
+            }
+        } else {
+            POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
+            oper.plan.add(lr);
+            oper.plan.connect(nfe3, lr);
+        }
         oper.setClosed(true);
+
         oper.requestedParallelism = 1;
         oper.markSampler();
         return new Pair<TezOperator, Integer>(oper, rp);
@@ -1672,8 +1936,7 @@ public class TezCompiler extends PhyPlan
 
             int rp = Math.max(op.getRequestedParallelism(), 1);
 
-            Pair<TezOperator, Integer> quantJobParallelismPair =
-                getQuantileJobs(op, rp);
+            Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
             TezOperator[] sortOpers = getSortJobs(op, quantJobParallelismPair.second, keyType, fields);
 
             lr.setOutputKey(sortOpers[0].getOperatorKey().toString());

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan 16 02:25:35 2014
@@ -81,6 +81,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -192,8 +193,8 @@ public class TezDagBuilder extends TezOp
         for (POLocalRearrangeTez lr : lrs) {
             if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
-                setIntermediateInputKeyValue(keyType, conf);
-                setIntermediateOutputKeyValue(keyType, conf);
+                setIntermediateInputKeyValue(keyType, conf, to.isSkewedJoin());
+                setIntermediateOutputKeyValue(keyType, conf, to.isSkewedJoin());
                 conf.set("pig.reduce.key.type", Byte.toString(keyType));
                 break;
             }
@@ -201,7 +202,7 @@ public class TezDagBuilder extends TezOp
 
         conf.setBoolean("mapred.mapper.new-api", true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        
+
         if (edge.partitionerClass != null) {
             conf.setClass("mapreduce.job.partitioner.class",
                     edge.partitionerClass, Partitioner.class);
@@ -234,11 +235,11 @@ public class TezDagBuilder extends TezOp
     private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
             Configuration conf) throws IOException {
         POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
-        setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf);
+        setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf, pkgTezOp.isSkewedJoin());
 
         POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
                 .getLeaves().get(0);
-        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf);
+        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp.isSkewedJoin());
 
         LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
                 combinePlan, pkgTezOp, combPack);
@@ -324,13 +325,16 @@ public class TezDagBuilder extends TezOp
             tezOp.plan.remove(pack);
             payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
             payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
-            setIntermediateInputKeyValue(keyType, payloadConf);
+            setIntermediateInputKeyValue(keyType, payloadConf, tezOp.isSkewedJoin());
             POShuffleTezLoad newPack;
             if (tezOp.isUnion()) {
                 newPack = new POUnionTezLoad(pack);
             } else {
                 newPack = new POShuffleTezLoad(pack);
             }
+            if (tezOp.isSkewedJoin()) {
+                newPack.setSkewedJoins(true);
+            }
             tezOp.plan.add(newPack);
 
             // Set input keys for POShuffleTezLoad. This is used to identify
@@ -350,24 +354,15 @@ public class TezDagBuilder extends TezOp
                 }
             }
 
-            @SuppressWarnings("rawtypes")
-            Class<? extends WritableComparable> keyClass = HDataType
-                    .getWritableComparableTypes(pack.getPkgr().getKeyType())
-                    .getClass();
-            payloadConf.set(
-                    TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
-                    keyClass.getName());
-            payloadConf.set(
-                    TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
-                    NullableTuple.class.getName());
-            selectInputComparator(payloadConf, pack.getPkgr().getKeyType());
+            setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
+                    tezOp.isSkewedJoin());
         }
 
         payloadConf.setClass("mapreduce.outputformat.class",
                 PigOutputFormat.class, OutputFormat.class);
 
-        // set parent plan in all operators.
-        // currently the parent plan is really used only when POStream, POSplit are present in the plan
+        // set parent plan in all operators. currently the parent plan is really
+        // used only when POStream, POSplit are present in the plan
         new PhyPlanSetter(tezOp.plan).visit();
 
         // Serialize the execution plan
@@ -606,35 +601,49 @@ public class TezDagBuilder extends TezOp
     }
 
     @SuppressWarnings("rawtypes")
-    private void setIntermediateInputKeyValue(byte keyType, Configuration conf)
+    private void setIntermediateInputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
             throws JobCreationException, ExecException {
         Class<? extends WritableComparable> keyClass = HDataType
                 .getWritableComparableTypes(keyType).getClass();
-        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
-                keyClass.getName());
+        if (isSkewedJoin) {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                    NullablePartitionWritable.class.getName());
+        } else {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                    keyClass.getName());
+        }
         conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
                 NullableTuple.class.getName());
-        selectInputComparator(conf, keyType);
+        selectInputComparator(conf, keyType, isSkewedJoin);
     }
 
     @SuppressWarnings("rawtypes")
-    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf)
+    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
             throws JobCreationException, ExecException {
         Class<? extends WritableComparable> keyClass = HDataType
                 .getWritableComparableTypes(keyType).getClass();
-        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
-                keyClass.getName());
+        if (isSkewedJoin) {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+                    NullablePartitionWritable.class.getName());
+        } else {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+                    keyClass.getName());
+        }
         conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
                 NullableTuple.class.getName());
         conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
                 MRPartitioner.class.getName());
-        selectOutputComparator(keyType, conf);
+        selectOutputComparator(keyType, conf, isSkewedJoin);
     }
 
-    static Class<? extends WritableComparator> comparatorForKeyType(byte keyType)
+    static Class<? extends WritableComparator> comparatorForKeyType(byte keyType, boolean isSkewedJoin)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
 
+        if (isSkewedJoin) {
+            return JobControlCompiler.PigGroupingPartitionWritableComparator.class;
+        }
+
         switch (keyType) {
         case DataType.BOOLEAN:
             return PigBooleanRawComparator.class;
@@ -686,19 +695,19 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    void selectInputComparator(Configuration conf, byte keyType)
+    void selectInputComparator(Configuration conf, byte keyType, boolean isSkewedJoin)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
         conf.setClass(
                 TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
-                comparatorForKeyType(keyType), RawComparator.class);
+                comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
     }
 
-    void selectOutputComparator(byte keyType, Configuration conf)
+    void selectOutputComparator(byte keyType, Configuration conf, boolean isSkewedJoin)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
         conf.setClass(
                 TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
-                comparatorForKeyType(keyType), RawComparator.class);
+                comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Thu Jan 16 02:25:35 2014
@@ -90,6 +90,8 @@ public class TezOperator extends Operato
     // to add additional map reduce operator with 1 reducer after this
     long limit = -1;
 
+    private boolean skewedJoin = false;
+
     // Flag to indicate if the small input splits need to be combined to form a larger
     // one in order to reduce the number of mappers. For merge join, both tables
     // are NOT combinable for correctness.
@@ -287,10 +289,22 @@ public class TezOperator extends Operato
         return globalSort;
     }
 
+    public void setLimitAfterSort(boolean limitAfterSort) {
+        this.limitAfterSort = limitAfterSort;
+    }
+
     public boolean isLimitAfterSort() {
         return limitAfterSort;
     }
 
+    public void setSkewedJoin(boolean skewedJoin) {
+        this.skewedJoin = skewedJoin;
+    }
+
+    public boolean isSkewedJoin() {
+        return skewedJoin;
+    }
+
     protected void noCombineSmallSplits() {
         combineSmallSplits = false;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Thu Jan 16 02:25:35 2014
@@ -36,9 +36,9 @@ public class WeightedRangePartitionerTez
     @Override
     public void init() {
         Map<String, Object> quantileMap = null;
-        if (PigProcessor.quantileMap!=null){
-            // We've already collect quantileMap in PigProcessor.quantileMap
-            quantileMap = PigProcessor.quantileMap;
+        if (PigProcessor.sampleMap != null) {
+            // We've collected sampleMap in PigProcessor
+            quantileMap = PigProcessor.sampleMap;
         } else {
             throw new RuntimeException(this.getClass().getSimpleName()
                     + " used but no quantiles found");