You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/01/16 03:25:35 UTC

svn commit: r1558674 [2/2] - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pi...

Modified: pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Jan 16 02:25:35 2014
@@ -40,278 +40,268 @@ import org.apache.pig.impl.util.Pair;
  * sampling process. It figures out how many reducers required to process a
  * skewed key without causing spill and allocate this number of reducers to this
  * key. This UDF outputs a map which contains 2 keys:
- * 
+ *
  * <li>&quot;totalreducers&quot;: the value is an integer wich indicates the
  *         number of total reducers for this join job </li>
  * <li>&quot;partition.list&quot;: the value is a bag which contains a
  * list of tuples with each tuple representing partitions for a skewed key.
- * The tuple has format of &lt;join key&gt;,&lt;min index of reducer&gt;, 
+ * The tuple has format of &lt;join key&gt;,&lt;min index of reducer&gt;,
  * &lt;max index of reducer&gt; </li>
- * 
- * For example, a join job configures 10 reducers, and the sampling process 
+ *
+ * For example, a join job configures 10 reducers, and the sampling process
  * finds out 2 skewed keys, &quot;swpv&quot; needs 4 reducers and &quot;swps&quot;
  * needs 2 reducers. The output file would be like following:
- * 
+ *
  * {totalreducers=10, partition.list={(swpv,0,3), (swps,4,5)}}
  *
- * The name of this file is set into next MR job which does the actual join. 
+ * The name of this file is set into next MR job which does the actual join.
  * That job uses this information to partition skewed keys properly
- * 
+ *
  */
 
 public class PartitionSkewedKeys extends EvalFunc<Map<String, Object>> {
 
-	public static final String PARTITION_LIST = "partition.list";
+    public static final String PARTITION_LIST = "partition.list";
 
-	public static final String TOTAL_REDUCERS = "totalreducers";
+    public static final String TOTAL_REDUCERS = "totalreducers";
 
-	public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
+    public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
 
-	private Log log = LogFactory.getLog(getClass());
+    private Log log = LogFactory.getLog(getClass());
 
-	BagFactory mBagFactory = BagFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
 
-	TupleFactory mTupleFactory = TupleFactory.getInstance();
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
 
-	private int currentIndex_;
+    private int currentIndex_;
 
-	private int totalReducers_;
+    private int totalReducers_;
 
-	private long totalMemory_;
+    private long totalMemory_;
 
-	private String inputFile_;
+    private long totalSampleCount_;
 
-	private long totalSampleCount_;
+    private double heapPercentage_;
 
-	private double heapPercentage_;
-	
     // specify how many tuple a reducer can hold for a key
     // this is for testing purpose. If not specified, then
     // it is calculated based on memory size and size of tuple
-	private int tupleMCount_; 
+    private int tupleMCount_;
 
-	public PartitionSkewedKeys() {
-		this(null);
-	}
-
-	public PartitionSkewedKeys(String[] args) {
-		totalReducers_ = -1;
-		currentIndex_ = 0;
-
-		if (args != null && args.length > 0) {
-			heapPercentage_ = Double.parseDouble(args[0]);
-			tupleMCount_ = Integer.parseInt(args[1]);
-			inputFile_ = args[2];			
-		} else {
-			heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
-		}
-		
-		if (log.isDebugEnabled()) {
-			log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
-			log.debug("input file: " + inputFile_);
-		}
-		
-		log.info("input file: " + inputFile_);
-
-	}
-
-	/**
-	 * first field in the input tuple is the number of reducers
-	 * 
-	 * second field is the *sorted* bag of samples
-	 * this should be called only once
-	 */
-	public Map<String, Object> exec(Tuple in) throws IOException {
-	    if (in == null || in.size() == 0) {                     
-	        return null;
-	    }
-	    Map<String, Object> output = new HashMap<String, Object>();
-
-	    totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
-	    log.info("Maximum of available memory is " + totalMemory_);
-
-	    ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
-
-	    Tuple currentTuple = null;
-	    long count = 0;
-	    
-	    // total size in memory for tuples in sample 
-	    long totalSampleMSize = 0;
-	    
-	    //total input rows for the join
-	    long totalInputRows = 0;
-	    
-	    try {
-	        totalReducers_ = (Integer) in.get(0);
-	        DataBag samples = (DataBag) in.get(1);
-
-	        totalSampleCount_ = samples.size();
-
-	        log.info("totalSample: " + totalSampleCount_);
-	        log.info("totalReducers: " + totalReducers_);			
-
-	        int maxReducers = 0;
-
-	        // first iterate the samples to find total number of rows
-	        Iterator<Tuple> iter1 = samples.iterator();
-	        while (iter1.hasNext()) {
-	            Tuple t = iter1.next();
-	            totalInputRows += (Long)t.get(t.size() - 1);
-	        }
-	                        
-	        // now iterate samples to do the reducer calculation
-	        Iterator<Tuple> iter2 = samples.iterator();
-	        while (iter2.hasNext()) {
-	            Tuple t = iter2.next();
-	            if (hasSameKey(currentTuple, t) || currentTuple == null) {
-	                count++;
-	                totalSampleMSize += getMemorySize(t);
-	            } else {
-	                Pair<Tuple, Integer> p = calculateReducers(currentTuple,
-	                        count, totalSampleMSize, totalInputRows);
-	                Tuple rt = p.first;
-	                if (rt != null) {
-	                    reducerList.add(rt);
-	                }
-	                if (maxReducers < p.second) {
-	                    maxReducers = p.second;
-	                }
-	                count = 1;
-	                totalSampleMSize = getMemorySize(t);
-	            }
-
-	            currentTuple = t;
-	        }
-
-	        // add last key
-	        if (count > 0) {
-	            Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
-	                    totalSampleMSize, totalInputRows);
-	            Tuple rt = p.first;
-	            if (rt != null) {
-	                reducerList.add(rt);
-	            }
-	            if (maxReducers < p.second) {
-	                maxReducers = p.second;
-	            }
-	        }
-
-	        if (maxReducers > totalReducers_) {
-	            if(pigLogger != null) {
-	                pigLogger.warn(this,"You need at least " + maxReducers
-	                        + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
-	            } else {
-	                log.warn("You need at least " + maxReducers
-	                        + " reducers to avoid spillage and run this job efficiently.");
-	            }
-	        }
-
-	        output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
-	        output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
-
-	        log.info(output.toString());
-	        if (log.isDebugEnabled()) {
-	            log.debug(output.toString());
-	        }
-
-	        return output;
-	    } catch (Exception e) {
-	        e.printStackTrace();
-	        throw new RuntimeException(e);
-	    }
-	}
-
-	private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
-	        long count, long totalMSize, long totalTuples) {
-	    // get average memory size per tuple
-	    double avgM = totalMSize / (double) count;
-
-	    // get the number of tuples that can fit into memory
-	    long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
-
-	    // estimate the number of total tuples for this key
-            long keyTupleCount = (long)  ( ((double) count/ totalSampleCount_) * 
-                        totalTuples);
-                            
-	    
-	    int redCount = (int) Math.round(Math.ceil((double) keyTupleCount
-	            / tupleMCount));
-
-	    if (log.isDebugEnabled()) 
-	    {
-	        log.debug("avgM: " + avgM);
-	        log.debug("tuple count: " + keyTupleCount);
-	        log.debug("count: " + count);
-	        log.debug("A reducer can take " + tupleMCount + " tuples and "
-	                + keyTupleCount + " tuples are find for " + currentTuple);
-	        log.debug("key " + currentTuple + " need " + redCount + " reducers");
-	    }
-
-	    // this is not a skewed key
-	    if (redCount <= 1) {
-	        return new Pair<Tuple, Integer>(null, 1);
-	    }
-
-	    Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
-	    int i = 0;
-	    try {
-	        // set keys
-	        for (; i < currentTuple.size() - 2; i++) {
-	            t.set(i, currentTuple.get(i));
-	        }
-
-	        int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
-	        // set the min index of reducer for this key
-	        t.set(i++, currentIndex_);
-	        currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
-	        if (currentIndex_ < 0) {
-	            currentIndex_ += totalReducers_;
-	        }
-	        // set the max index of reducer for this key
-	        t.set(i++, currentIndex_);
-	    } catch (ExecException e) {
-	        throw new RuntimeException("Failed to set value to tuple." + e);
-	    }
-
-	    currentIndex_ = (currentIndex_ + 1) % totalReducers_;
-
-	    Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
-
-	    return p;
-	}
-
-	// the last field of the tuple is a tuple for memory size and disk size
-	private long getMemorySize(Tuple t) {
-	    int s = t.size();
-	    try {
-	        return (Long) t.get(s - 2);
-	    } catch (ExecException e) {
-	        throw new RuntimeException(
-	                "Unable to retrive the size field from tuple.", e);
-	    }
-	}
-
-
-	private boolean hasSameKey(Tuple t1, Tuple t2) {
-		// Have to break the tuple down and compare it field to field.
-		int sz1 = t1 == null ? 0 : t1.size();
-		int sz2 = t2 == null ? 0 : t2.size();
-		if (sz2 != sz1) {
-			return false;
-		}
-
-		for (int i = 0; i < sz1 - 2; i++) {
-			try {
-				int c = DataType.compare(t1.get(i), t2.get(i));
-				if (c != 0) {
-					return false;
-				}
-			} catch (ExecException e) {
-				throw new RuntimeException("Unable to compare tuples", e);
-			}
-		}
+    public PartitionSkewedKeys() {
+        this(null);
+    }
+
+    public PartitionSkewedKeys(String[] args) {
+        totalReducers_ = -1;
+        currentIndex_ = 0;
+
+        if (args != null && args.length > 0) {
+            heapPercentage_ = Double.parseDouble(args[0]);
+            tupleMCount_ = Integer.parseInt(args[1]);
+        } else {
+            heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
+        }
+    }
+
+    /**
+     * first field in the input tuple is the number of reducers
+     *
+     * second field is the *sorted* bag of samples
+     * this should be called only once
+     */
+    public Map<String, Object> exec(Tuple in) throws IOException {
+        if (in == null || in.size() == 0) {
+            return null;
+        }
+        Map<String, Object> output = new HashMap<String, Object>();
+
+        totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+        log.info("Maximum of available memory is " + totalMemory_);
+
+        ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+        Tuple currentTuple = null;
+        long count = 0;
+
+        // total size in memory for tuples in sample
+        long totalSampleMSize = 0;
+
+        //total input rows for the join
+        long totalInputRows = 0;
+
+        try {
+            totalReducers_ = (Integer) in.get(0);
+            DataBag samples = (DataBag) in.get(1);
+
+            totalSampleCount_ = samples.size();
+
+            log.info("totalSample: " + totalSampleCount_);
+            log.info("totalReducers: " + totalReducers_);
+
+            int maxReducers = 0;
+
+            // first iterate the samples to find total number of rows
+            Iterator<Tuple> iter1 = samples.iterator();
+            while (iter1.hasNext()) {
+                Tuple t = iter1.next();
+                totalInputRows += (Long)t.get(t.size() - 1);
+            }
+
+            // now iterate samples to do the reducer calculation
+            Iterator<Tuple> iter2 = samples.iterator();
+            while (iter2.hasNext()) {
+                Tuple t = iter2.next();
+                if (hasSameKey(currentTuple, t) || currentTuple == null) {
+                    count++;
+                    totalSampleMSize += getMemorySize(t);
+                } else {
+                    Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+                            count, totalSampleMSize, totalInputRows);
+                    Tuple rt = p.first;
+                    if (rt != null) {
+                        reducerList.add(rt);
+                    }
+                    if (maxReducers < p.second) {
+                        maxReducers = p.second;
+                    }
+                    count = 1;
+                    totalSampleMSize = getMemorySize(t);
+                }
+
+                currentTuple = t;
+            }
+
+            // add last key
+            if (count > 0) {
+                Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+                        totalSampleMSize, totalInputRows);
+                Tuple rt = p.first;
+                if (rt != null) {
+                    reducerList.add(rt);
+                }
+                if (maxReducers < p.second) {
+                    maxReducers = p.second;
+                }
+            }
+
+            if (maxReducers > totalReducers_) {
+                if(pigLogger != null) {
+                    pigLogger.warn(this,"You need at least " + maxReducers
+                            + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
+                } else {
+                    log.warn("You need at least " + maxReducers
+                            + " reducers to avoid spillage and run this job efficiently.");
+                }
+            }
+
+            output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+            output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+            log.info(output.toString());
+            if (log.isDebugEnabled()) {
+                log.debug(output.toString());
+            }
+
+            return output;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
+            long count, long totalMSize, long totalTuples) {
+        // get average memory size per tuple
+        double avgM = totalMSize / (double) count;
+
+        // get the number of tuples that can fit into memory
+        long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+        // estimate the number of total tuples for this key
+        long keyTupleCount = (long)  ( ((double) count/ totalSampleCount_) * totalTuples);
+
+        int redCount = (int) Math.round(Math.ceil((double) keyTupleCount / tupleMCount));
+
+        if (log.isDebugEnabled())
+        {
+            log.debug("avgM: " + avgM);
+            log.debug("tuple count: " + keyTupleCount);
+            log.debug("count: " + count);
+            log.debug("A reducer can take " + tupleMCount + " tuples and "
+                    + keyTupleCount + " tuples are find for " + currentTuple);
+            log.debug("key " + currentTuple + " need " + redCount + " reducers");
+        }
+
+        // this is not a skewed key
+        if (redCount <= 1) {
+            return new Pair<Tuple, Integer>(null, 1);
+        }
+
+        Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+        int i = 0;
+        try {
+            // set keys
+            for (; i < currentTuple.size() - 2; i++) {
+                t.set(i, currentTuple.get(i));
+            }
+
+            int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
+            // set the min index of reducer for this key
+            t.set(i++, currentIndex_);
+            currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
+            if (currentIndex_ < 0) {
+                currentIndex_ += totalReducers_;
+            }
+            // set the max index of reducer for this key
+            t.set(i++, currentIndex_);
+        } catch (ExecException e) {
+            throw new RuntimeException("Failed to set value to tuple." + e);
+        }
+
+        currentIndex_ = (currentIndex_ + 1) % totalReducers_;
+
+        Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
+
+        return p;
+    }
+
+    // the last field of the tuple is a tuple for memory size and disk size
+    private long getMemorySize(Tuple t) {
+        int s = t.size();
+        try {
+            return (Long) t.get(s - 2);
+        } catch (ExecException e) {
+            throw new RuntimeException(
+                    "Unable to retrive the size field from tuple.", e);
+        }
+    }
+
+
+    private boolean hasSameKey(Tuple t1, Tuple t2) {
+        // Have to break the tuple down and compare it field to field.
+        int sz1 = t1 == null ? 0 : t1.size();
+        int sz2 = t2 == null ? 0 : t2.size();
+        if (sz2 != sz1) {
+            return false;
+        }
+
+        for (int i = 0; i < sz1 - 2; i++) {
+            try {
+                int c = DataType.compare(t1.get(i), t2.get(i));
+                if (c != 0) {
+                    return false;
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException("Unable to compare tuples", e);
+            }
+        }
 
-		return true;
-	}
+        return true;
+    }
 
 }

Modified: pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Jan 16 02:25:35 2014
@@ -18,71 +18,57 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Properties;
-
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.util.Pair;
 
 /**
  * See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler
  */
 public class PoissonSampleLoader extends SampleLoader {
-    
+
     // marker string to mark the last sample row, which has total number or rows
     // seen by this map instance
     // this string will be in the 2nd last column of the last sample row
     // it is used by GetMemNumRows
-    public static final String NUMROWS_TUPLE_MARKER = 
+    public static final String NUMROWS_TUPLE_MARKER =
         "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
-    
+
     //num of rows sampled so far
     private int numRowsSampled = 0;
-    
+
     //average size of tuple in memory, for tuples sampled
     private long avgTupleMemSz = 0;
-    
-    //current row number 
+
+    //current row number
     private long rowNum = 0;
-    
+
     // number of tuples to skip after each sample
-    long skipInterval = -1;
+    private long skipInterval = -1;
 
-    // bytes in input to skip after every sample. 
-    // divide this by avgTupleMemSize to get skipInterval 
+    // bytes in input to skip after every sample.
+    // divide this by avgTupleMemSize to get skipInterval
     private long memToSkipPerSample = 0;
-    
+
     // has the special row with row number information been returned
     private boolean numRowSplTupleReturned = false;
-    
-    /// For a given mean and a confidence, a sample rate is obtained from a poisson cdf
-    private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
-    
+
     // 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
     // set to 10 (emperically, minimum number of samples) and the confidence set to 95%
     private static final int DEFAULT_SAMPLE_RATE = 17;
-        
+
     private int sampleRate = DEFAULT_SAMPLE_RATE;
-    
-    /// % of memory available for the input data. This is currenty equal to the memory available
-    /// for the skewed join
-    private static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
 
     private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-    
+
     // new Sample tuple
     private Tuple newSample = null;
-        
-//  private final Log log = LogFactory.getLog(getClass());
-    
+
     public PoissonSampleLoader(String funcSpec, String ns) {
         super(funcSpec);
         super.setNumSamples(Integer.valueOf(ns)); // will be overridden
@@ -91,18 +77,18 @@ public class PoissonSampleLoader extends
     @Override
     public Tuple getNext() throws IOException {
         if(numRowSplTupleReturned){
-            // row num special row has been returned after all inputs 
-            // were read, nothing more to read 
+            // row num special row has been returned after all inputs
+            // were read, nothing more to read
             return null;
         }
 
-
         if(skipInterval == -1){
             //select first tuple as sample and calculate
-            // number of tuples to be skipped 
+            // number of tuples to be skipped
             Tuple t = loader.getNext();
-            if(t == null)
+            if(t == null) {
                 return createNumRowTuple(null);
+            }
             long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
             memToSkipPerSample = availRedMem/sampleRate;
             updateSkipInterval(t);
@@ -121,8 +107,9 @@ public class PoissonSampleLoader extends
 
         // skipped enough, get new sample
         Tuple t = loader.getNext();
-        if(t == null)
+        if(t == null) {
             return createNumRowTuple(newSample);
+        }
         updateSkipInterval(t);
         rowNum++;
         Tuple currentSample = newSample;
@@ -136,14 +123,14 @@ public class PoissonSampleLoader extends
      * @param t - tuple
      */
     private void updateSkipInterval(Tuple t) {
-        avgTupleMemSz = 
+        avgTupleMemSz =
             ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
         skipInterval = memToSkipPerSample/avgTupleMemSz;
-        
-            // skipping fewer number of rows the first few times, to reduce 
-            // the probability of first tuples size (if much smaller than rest) 
-        // resulting in 
-            // very few samples being sampled. Sampling a little extra is OK
+
+        // skipping fewer number of rows the first few times, to reduce the
+        // probability of first tuples size (if much smaller than rest)
+        // resulting in very few samples being sampled. Sampling a little extra
+        // is OK
         if(numRowsSampled < 5)
             skipInterval = skipInterval/(10-numRowsSampled);
         ++numRowsSampled;
@@ -157,21 +144,22 @@ public class PoissonSampleLoader extends
      */
     private Tuple createNumRowTuple(Tuple sample) throws ExecException {
         int sz = (sample == null) ? 0 : sample.size();
-        TupleFactory factory = TupleFactory.getInstance();       
+        TupleFactory factory = TupleFactory.getInstance();
         Tuple t = factory.newTuple(sz + 2);
- 
+
         if (sample != null) {
             for(int i=0; i<sample.size(); i++){
                 t.set(i, sample.get(i));
             }
         }
-        
+
         t.set(sz, NUMROWS_TUPLE_MARKER);
         t.set(sz + 1, rowNum);
         numRowSplTupleReturned = true;
         return t;
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
         super.prepareToRead(reader, split);
@@ -184,8 +172,9 @@ public class PoissonSampleLoader extends
         newSample = null;
 
         Configuration conf = split.getConf();
-        sampleRate = conf.getInt(SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
-        heapPerc = conf.getFloat(PERC_MEM_AVAIL, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+        sampleRate = conf.getInt(PigConfiguration.SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+        heapPerc = conf.getFloat(PigConfiguration.PERC_MEM_AVAIL,
+                PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
     }
 
 }

Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Thu Jan 16 02:25:35 2014
@@ -171,6 +171,7 @@ d = filter b by age < 20;
 e = join c by name, d by name;
 store e into ':OUTPATH:';\,
                         },
+                        # Replicated inner join
                         {
                         'num' => 2,
                         'pig' => q\set pig.tez.session.reuse false;
@@ -181,6 +182,7 @@ d = filter b by age < 20;
 e = join c by name, d by name using 'replicated';
 store e into ':OUTPATH:';\,
                         },
+                        # Replicated outer join
                         {
                         'num' => 3,
                         'pig' => q\set pig.tez.session.reuse false;
@@ -242,6 +244,28 @@ store e into ':OUTPATH:';\,
                                         store g into ':OUTPATH:';\,
                         'notmq' => 1,
                         },
+                        # Skewed inner join
+                        {
+                        'num' => 7,
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'skewed';
+store e into ':OUTPATH:';\,
+                        },
+                        # Skewed outer join
+                        {
+                        'num' => 8,
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name LEFT OUTER, d by name using 'skewed';
+store e into ':OUTPATH:';\,
+                        },
                   ]
                 },
                 {