You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/01/16 03:25:35 UTC
svn commit: r1558674 [2/2] - in /pig/branches/tez: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pi...
Modified: pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Jan 16 02:25:35 2014
@@ -40,278 +40,268 @@ import org.apache.pig.impl.util.Pair;
* sampling process. It figures out how many reducers required to process a
* skewed key without causing spill and allocate this number of reducers to this
* key. This UDF outputs a map which contains 2 keys:
- *
+ *
* <li>"totalreducers": the value is an integer wich indicates the
* number of total reducers for this join job </li>
* <li>"partition.list": the value is a bag which contains a
* list of tuples with each tuple representing partitions for a skewed key.
- * The tuple has format of <join key>,<min index of reducer>,
+ * The tuple has format of <join key>,<min index of reducer>,
* <max index of reducer> </li>
- *
- * For example, a join job configures 10 reducers, and the sampling process
+ *
+ * For example, a join job configures 10 reducers, and the sampling process
* finds out 2 skewed keys, "swpv" needs 4 reducers and "swps"
* needs 2 reducers. The output file would be like following:
- *
+ *
* {totalreducers=10, partition.list={(swpv,0,3), (swps,4,5)}}
*
- * The name of this file is set into next MR job which does the actual join.
+ * The name of this file is set into next MR job which does the actual join.
* That job uses this information to partition skewed keys properly
- *
+ *
*/
public class PartitionSkewedKeys extends EvalFunc<Map<String, Object>> {
- public static final String PARTITION_LIST = "partition.list";
+ public static final String PARTITION_LIST = "partition.list";
- public static final String TOTAL_REDUCERS = "totalreducers";
+ public static final String TOTAL_REDUCERS = "totalreducers";
- public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
+ public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
- private Log log = LogFactory.getLog(getClass());
+ private Log log = LogFactory.getLog(getClass());
- BagFactory mBagFactory = BagFactory.getInstance();
+ BagFactory mBagFactory = BagFactory.getInstance();
- TupleFactory mTupleFactory = TupleFactory.getInstance();
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
- private int currentIndex_;
+ private int currentIndex_;
- private int totalReducers_;
+ private int totalReducers_;
- private long totalMemory_;
+ private long totalMemory_;
- private String inputFile_;
+ private long totalSampleCount_;
- private long totalSampleCount_;
+ private double heapPercentage_;
- private double heapPercentage_;
-
// specify how many tuple a reducer can hold for a key
// this is for testing purpose. If not specified, then
// it is calculated based on memory size and size of tuple
- private int tupleMCount_;
+ private int tupleMCount_;
- public PartitionSkewedKeys() {
- this(null);
- }
-
- public PartitionSkewedKeys(String[] args) {
- totalReducers_ = -1;
- currentIndex_ = 0;
-
- if (args != null && args.length > 0) {
- heapPercentage_ = Double.parseDouble(args[0]);
- tupleMCount_ = Integer.parseInt(args[1]);
- inputFile_ = args[2];
- } else {
- heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
- log.debug("input file: " + inputFile_);
- }
-
- log.info("input file: " + inputFile_);
-
- }
-
- /**
- * first field in the input tuple is the number of reducers
- *
- * second field is the *sorted* bag of samples
- * this should be called only once
- */
- public Map<String, Object> exec(Tuple in) throws IOException {
- if (in == null || in.size() == 0) {
- return null;
- }
- Map<String, Object> output = new HashMap<String, Object>();
-
- totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
- log.info("Maximum of available memory is " + totalMemory_);
-
- ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
-
- Tuple currentTuple = null;
- long count = 0;
-
- // total size in memory for tuples in sample
- long totalSampleMSize = 0;
-
- //total input rows for the join
- long totalInputRows = 0;
-
- try {
- totalReducers_ = (Integer) in.get(0);
- DataBag samples = (DataBag) in.get(1);
-
- totalSampleCount_ = samples.size();
-
- log.info("totalSample: " + totalSampleCount_);
- log.info("totalReducers: " + totalReducers_);
-
- int maxReducers = 0;
-
- // first iterate the samples to find total number of rows
- Iterator<Tuple> iter1 = samples.iterator();
- while (iter1.hasNext()) {
- Tuple t = iter1.next();
- totalInputRows += (Long)t.get(t.size() - 1);
- }
-
- // now iterate samples to do the reducer calculation
- Iterator<Tuple> iter2 = samples.iterator();
- while (iter2.hasNext()) {
- Tuple t = iter2.next();
- if (hasSameKey(currentTuple, t) || currentTuple == null) {
- count++;
- totalSampleMSize += getMemorySize(t);
- } else {
- Pair<Tuple, Integer> p = calculateReducers(currentTuple,
- count, totalSampleMSize, totalInputRows);
- Tuple rt = p.first;
- if (rt != null) {
- reducerList.add(rt);
- }
- if (maxReducers < p.second) {
- maxReducers = p.second;
- }
- count = 1;
- totalSampleMSize = getMemorySize(t);
- }
-
- currentTuple = t;
- }
-
- // add last key
- if (count > 0) {
- Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
- totalSampleMSize, totalInputRows);
- Tuple rt = p.first;
- if (rt != null) {
- reducerList.add(rt);
- }
- if (maxReducers < p.second) {
- maxReducers = p.second;
- }
- }
-
- if (maxReducers > totalReducers_) {
- if(pigLogger != null) {
- pigLogger.warn(this,"You need at least " + maxReducers
- + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
- } else {
- log.warn("You need at least " + maxReducers
- + " reducers to avoid spillage and run this job efficiently.");
- }
- }
-
- output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
- output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
-
- log.info(output.toString());
- if (log.isDebugEnabled()) {
- log.debug(output.toString());
- }
-
- return output;
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
- long count, long totalMSize, long totalTuples) {
- // get average memory size per tuple
- double avgM = totalMSize / (double) count;
-
- // get the number of tuples that can fit into memory
- long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
-
- // estimate the number of total tuples for this key
- long keyTupleCount = (long) ( ((double) count/ totalSampleCount_) *
- totalTuples);
-
-
- int redCount = (int) Math.round(Math.ceil((double) keyTupleCount
- / tupleMCount));
-
- if (log.isDebugEnabled())
- {
- log.debug("avgM: " + avgM);
- log.debug("tuple count: " + keyTupleCount);
- log.debug("count: " + count);
- log.debug("A reducer can take " + tupleMCount + " tuples and "
- + keyTupleCount + " tuples are find for " + currentTuple);
- log.debug("key " + currentTuple + " need " + redCount + " reducers");
- }
-
- // this is not a skewed key
- if (redCount <= 1) {
- return new Pair<Tuple, Integer>(null, 1);
- }
-
- Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
- int i = 0;
- try {
- // set keys
- for (; i < currentTuple.size() - 2; i++) {
- t.set(i, currentTuple.get(i));
- }
-
- int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
- // set the min index of reducer for this key
- t.set(i++, currentIndex_);
- currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
- if (currentIndex_ < 0) {
- currentIndex_ += totalReducers_;
- }
- // set the max index of reducer for this key
- t.set(i++, currentIndex_);
- } catch (ExecException e) {
- throw new RuntimeException("Failed to set value to tuple." + e);
- }
-
- currentIndex_ = (currentIndex_ + 1) % totalReducers_;
-
- Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
-
- return p;
- }
-
- // the last field of the tuple is a tuple for memory size and disk size
- private long getMemorySize(Tuple t) {
- int s = t.size();
- try {
- return (Long) t.get(s - 2);
- } catch (ExecException e) {
- throw new RuntimeException(
- "Unable to retrive the size field from tuple.", e);
- }
- }
-
-
- private boolean hasSameKey(Tuple t1, Tuple t2) {
- // Have to break the tuple down and compare it field to field.
- int sz1 = t1 == null ? 0 : t1.size();
- int sz2 = t2 == null ? 0 : t2.size();
- if (sz2 != sz1) {
- return false;
- }
-
- for (int i = 0; i < sz1 - 2; i++) {
- try {
- int c = DataType.compare(t1.get(i), t2.get(i));
- if (c != 0) {
- return false;
- }
- } catch (ExecException e) {
- throw new RuntimeException("Unable to compare tuples", e);
- }
- }
+ public PartitionSkewedKeys() {
+ this(null);
+ }
+
+ public PartitionSkewedKeys(String[] args) {
+ totalReducers_ = -1;
+ currentIndex_ = 0;
+
+ if (args != null && args.length > 0) {
+ heapPercentage_ = Double.parseDouble(args[0]);
+ tupleMCount_ = Integer.parseInt(args[1]);
+ } else {
+ heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
+ }
+ }
+
+ /**
+ * first field in the input tuple is the number of reducers
+ *
+ * second field is the *sorted* bag of samples
+ * this should be called only once
+ */
+ public Map<String, Object> exec(Tuple in) throws IOException {
+ if (in == null || in.size() == 0) {
+ return null;
+ }
+ Map<String, Object> output = new HashMap<String, Object>();
+
+ totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+ log.info("Maximum of available memory is " + totalMemory_);
+
+ ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+ Tuple currentTuple = null;
+ long count = 0;
+
+ // total size in memory for tuples in sample
+ long totalSampleMSize = 0;
+
+ //total input rows for the join
+ long totalInputRows = 0;
+
+ try {
+ totalReducers_ = (Integer) in.get(0);
+ DataBag samples = (DataBag) in.get(1);
+
+ totalSampleCount_ = samples.size();
+
+ log.info("totalSample: " + totalSampleCount_);
+ log.info("totalReducers: " + totalReducers_);
+
+ int maxReducers = 0;
+
+ // first iterate the samples to find total number of rows
+ Iterator<Tuple> iter1 = samples.iterator();
+ while (iter1.hasNext()) {
+ Tuple t = iter1.next();
+ totalInputRows += (Long)t.get(t.size() - 1);
+ }
+
+ // now iterate samples to do the reducer calculation
+ Iterator<Tuple> iter2 = samples.iterator();
+ while (iter2.hasNext()) {
+ Tuple t = iter2.next();
+ if (hasSameKey(currentTuple, t) || currentTuple == null) {
+ count++;
+ totalSampleMSize += getMemorySize(t);
+ } else {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+ count, totalSampleMSize, totalInputRows);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ count = 1;
+ totalSampleMSize = getMemorySize(t);
+ }
+
+ currentTuple = t;
+ }
+
+ // add last key
+ if (count > 0) {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+ totalSampleMSize, totalInputRows);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ }
+
+ if (maxReducers > totalReducers_) {
+ if(pigLogger != null) {
+ pigLogger.warn(this,"You need at least " + maxReducers
+ + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
+ } else {
+ log.warn("You need at least " + maxReducers
+ + " reducers to avoid spillage and run this job efficiently.");
+ }
+ }
+
+ output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+ output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+ log.info(output.toString());
+ if (log.isDebugEnabled()) {
+ log.debug(output.toString());
+ }
+
+ return output;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
+ long count, long totalMSize, long totalTuples) {
+ // get average memory size per tuple
+ double avgM = totalMSize / (double) count;
+
+ // get the number of tuples that can fit into memory
+ long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+ // estimate the number of total tuples for this key
+ long keyTupleCount = (long) ( ((double) count/ totalSampleCount_) * totalTuples);
+
+ int redCount = (int) Math.round(Math.ceil((double) keyTupleCount / tupleMCount));
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("avgM: " + avgM);
+ log.debug("tuple count: " + keyTupleCount);
+ log.debug("count: " + count);
+ log.debug("A reducer can take " + tupleMCount + " tuples and "
+ + keyTupleCount + " tuples are find for " + currentTuple);
+ log.debug("key " + currentTuple + " need " + redCount + " reducers");
+ }
+
+ // this is not a skewed key
+ if (redCount <= 1) {
+ return new Pair<Tuple, Integer>(null, 1);
+ }
+
+ Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+ int i = 0;
+ try {
+ // set keys
+ for (; i < currentTuple.size() - 2; i++) {
+ t.set(i, currentTuple.get(i));
+ }
+
+ int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
+ // set the min index of reducer for this key
+ t.set(i++, currentIndex_);
+ currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
+ if (currentIndex_ < 0) {
+ currentIndex_ += totalReducers_;
+ }
+ // set the max index of reducer for this key
+ t.set(i++, currentIndex_);
+ } catch (ExecException e) {
+ throw new RuntimeException("Failed to set value to tuple." + e);
+ }
+
+ currentIndex_ = (currentIndex_ + 1) % totalReducers_;
+
+ Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
+
+ return p;
+ }
+
+ // the last field of the tuple is a tuple for memory size and disk size
+ private long getMemorySize(Tuple t) {
+ int s = t.size();
+ try {
+ return (Long) t.get(s - 2);
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Unable to retrive the size field from tuple.", e);
+ }
+ }
+
+
+ private boolean hasSameKey(Tuple t1, Tuple t2) {
+ // Have to break the tuple down and compare it field to field.
+ int sz1 = t1 == null ? 0 : t1.size();
+ int sz2 = t2 == null ? 0 : t2.size();
+ if (sz2 != sz1) {
+ return false;
+ }
+
+ for (int i = 0; i < sz1 - 2; i++) {
+ try {
+ int c = DataType.compare(t1.get(i), t2.get(i));
+ if (c != 0) {
+ return false;
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to compare tuples", e);
+ }
+ }
- return true;
- }
+ return true;
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Jan 16 02:25:35 2014
@@ -18,71 +18,57 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Properties;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.util.Pair;
/**
* See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler
*/
public class PoissonSampleLoader extends SampleLoader {
-
+
// marker string to mark the last sample row, which has total number or rows
// seen by this map instance
// this string will be in the 2nd last column of the last sample row
// it is used by GetMemNumRows
- public static final String NUMROWS_TUPLE_MARKER =
+ public static final String NUMROWS_TUPLE_MARKER =
"\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
-
+
//num of rows sampled so far
private int numRowsSampled = 0;
-
+
//average size of tuple in memory, for tuples sampled
private long avgTupleMemSz = 0;
-
- //current row number
+
+ //current row number
private long rowNum = 0;
-
+
// number of tuples to skip after each sample
- long skipInterval = -1;
+ private long skipInterval = -1;
- // bytes in input to skip after every sample.
- // divide this by avgTupleMemSize to get skipInterval
+ // bytes in input to skip after every sample.
+ // divide this by avgTupleMemSize to get skipInterval
private long memToSkipPerSample = 0;
-
+
// has the special row with row number information been returned
private boolean numRowSplTupleReturned = false;
-
- /// For a given mean and a confidence, a sample rate is obtained from a poisson cdf
- private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
-
+
// 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
// set to 10 (emperically, minimum number of samples) and the confidence set to 95%
private static final int DEFAULT_SAMPLE_RATE = 17;
-
+
private int sampleRate = DEFAULT_SAMPLE_RATE;
-
- /// % of memory available for the input data. This is currenty equal to the memory available
- /// for the skewed join
- private static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-
+
// new Sample tuple
private Tuple newSample = null;
-
-// private final Log log = LogFactory.getLog(getClass());
-
+
public PoissonSampleLoader(String funcSpec, String ns) {
super(funcSpec);
super.setNumSamples(Integer.valueOf(ns)); // will be overridden
@@ -91,18 +77,18 @@ public class PoissonSampleLoader extends
@Override
public Tuple getNext() throws IOException {
if(numRowSplTupleReturned){
- // row num special row has been returned after all inputs
- // were read, nothing more to read
+ // row num special row has been returned after all inputs
+ // were read, nothing more to read
return null;
}
-
if(skipInterval == -1){
//select first tuple as sample and calculate
- // number of tuples to be skipped
+ // number of tuples to be skipped
Tuple t = loader.getNext();
- if(t == null)
+ if(t == null) {
return createNumRowTuple(null);
+ }
long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
memToSkipPerSample = availRedMem/sampleRate;
updateSkipInterval(t);
@@ -121,8 +107,9 @@ public class PoissonSampleLoader extends
// skipped enough, get new sample
Tuple t = loader.getNext();
- if(t == null)
+ if(t == null) {
return createNumRowTuple(newSample);
+ }
updateSkipInterval(t);
rowNum++;
Tuple currentSample = newSample;
@@ -136,14 +123,14 @@ public class PoissonSampleLoader extends
* @param t - tuple
*/
private void updateSkipInterval(Tuple t) {
- avgTupleMemSz =
+ avgTupleMemSz =
((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
skipInterval = memToSkipPerSample/avgTupleMemSz;
-
- // skipping fewer number of rows the first few times, to reduce
- // the probability of first tuples size (if much smaller than rest)
- // resulting in
- // very few samples being sampled. Sampling a little extra is OK
+
+ // skipping fewer number of rows the first few times, to reduce the
+ // probability of first tuples size (if much smaller than rest)
+ // resulting in very few samples being sampled. Sampling a little extra
+ // is OK
if(numRowsSampled < 5)
skipInterval = skipInterval/(10-numRowsSampled);
++numRowsSampled;
@@ -157,21 +144,22 @@ public class PoissonSampleLoader extends
*/
private Tuple createNumRowTuple(Tuple sample) throws ExecException {
int sz = (sample == null) ? 0 : sample.size();
- TupleFactory factory = TupleFactory.getInstance();
+ TupleFactory factory = TupleFactory.getInstance();
Tuple t = factory.newTuple(sz + 2);
-
+
if (sample != null) {
for(int i=0; i<sample.size(); i++){
t.set(i, sample.get(i));
}
}
-
+
t.set(sz, NUMROWS_TUPLE_MARKER);
t.set(sz + 1, rowNum);
numRowSplTupleReturned = true;
return t;
}
+ @SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
super.prepareToRead(reader, split);
@@ -184,8 +172,9 @@ public class PoissonSampleLoader extends
newSample = null;
Configuration conf = split.getConf();
- sampleRate = conf.getInt(SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
- heapPerc = conf.getFloat(PERC_MEM_AVAIL, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+ sampleRate = conf.getInt(PigConfiguration.SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+ heapPerc = conf.getFloat(PigConfiguration.PERC_MEM_AVAIL,
+ PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
}
}
Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Thu Jan 16 02:25:35 2014
@@ -171,6 +171,7 @@ d = filter b by age < 20;
e = join c by name, d by name;
store e into ':OUTPATH:';\,
},
+ # Replicated inner join
{
'num' => 2,
'pig' => q\set pig.tez.session.reuse false;
@@ -181,6 +182,7 @@ d = filter b by age < 20;
e = join c by name, d by name using 'replicated';
store e into ':OUTPATH:';\,
},
+ # Replicated outer join
{
'num' => 3,
'pig' => q\set pig.tez.session.reuse false;
@@ -242,6 +244,28 @@ store e into ':OUTPATH:';\,
store g into ':OUTPATH:';\,
'notmq' => 1,
},
+ # Skewed inner join
+ {
+ 'num' => 7,
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'skewed';
+store e into ':OUTPATH:';\,
+ },
+ # Skewed outer join
+ {
+ 'num' => 8,
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name LEFT OUTER, d by name using 'skewed';
+store e into ':OUTPATH:';\,
+ },
]
},
{