You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2012/08/16 22:13:38 UTC
svn commit: r1374030 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
test/org/apache/pig/test/TestPoissonSampleLoader.java
Author: thejas
Date: Thu Aug 16 20:13:37 2012
New Revision: 1374030
URL: http://svn.apache.org/viewvc?rev=1374030&view=rev
Log:
PIG-2662: skew join does not honor its config parameters (rajesh.balamohan via thejas)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1374030&r1=1374029&r2=1374030&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 16 20:13:37 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2662: skew join does not honor its config parameters (rajesh.balamohan via thejas)
+
PIG-2871: Refactor signature for PigReducerEstimator (billgraham)
PIG-2851: Add flag to ant to run tests with a debugger port (billgraham)
Modified: pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1374030&r1=1374029&r2=1374030&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Aug 16 20:13:37 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
@@ -171,36 +172,6 @@ public class PoissonSampleLoader extends
return t;
}
- /**
- * Computes the number of samples for the loader
- *
- * @param inputs : Set to pig inputs
- * @param pc : PigContext object
- *
- */
- @Override
- public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs,
- PigContext pc) throws ExecException {
- Properties pcProps = pc.getProperties();
-
- // % of memory available for the records
- heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
- if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
- try {
- heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
- }catch(NumberFormatException e) {
- // ignore, use default value
- }
- }
-
- try {
- sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
- } catch (NumberFormatException e) {
- sampleRate = DEFAULT_SAMPLE_RATE;
- }
-
- }
-
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
super.prepareToRead(reader, split);
@@ -210,9 +181,11 @@ public class PoissonSampleLoader extends
skipInterval = -1;
memToSkipPerSample = 0;
numRowSplTupleReturned = false;
- sampleRate = DEFAULT_SAMPLE_RATE;
- heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
newSample = null;
+
+ Configuration conf = split.getConf();
+ sampleRate = conf.getInt(SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+ heapPerc = conf.getFloat(PERC_MEM_AVAIL, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java?rev=1374030&r1=1374029&r2=1374030&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java Thu Aug 16 20:13:37 2012
@@ -90,8 +90,9 @@ public class TestPoissonSampleLoader ext
Util.deleteFile(cluster, INPUT_FILE1);
}
- @Test
- public void testNumSamples() throws IOException {
+ private int testNumSamples(String memUsage, String sampleRate) throws IOException {
+ pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", memUsage);
+ pigServer.getPigContext().getProperties().setProperty("pig.sksampler.samplerate", sampleRate);
pigServer.registerQuery("A = Load '"+INPUT_FILE1+"' Using PoissonSampleLoader('PigStorage()', '100');");
Iterator<Tuple> iter = pigServer.openIterator("A");
int count = 0;
@@ -99,7 +100,29 @@ public class TestPoissonSampleLoader ext
count++;
iter.next();
}
- assertEquals(count, 1);
+ return count;
+ }
+
+ @Test
+ public void testNumSamples() throws IOException {
+ //PoissonSampleLoader.DEFAULT_SAMPLE_RATE is 17
+ int count = testNumSamples("0.0001", "17");
+ assertEquals(count, 12);
+
+ count = testNumSamples("0.001", "17");
+ assertEquals(count, 6);
+
+ count = testNumSamples("0.001", "10");
+ assertEquals(count, 4);
+
+ count = testNumSamples("0.001", "100");
+ assertEquals(count, 9);
+
+ count = testNumSamples("0.005", "17");
+ assertEquals(count, 2);
+
+ count = testNumSamples("0.0001", "100");
+ assertEquals(count, 42);
}
/*
@@ -110,7 +133,19 @@ public class TestPoissonSampleLoader ext
pigServer.registerQuery("A = Load '"+INPUT_FILE1+"' Using PoissonSampleLoader('PigStorage(\\\\\\':\\\\\\')', '100');");
Iterator<Tuple> iter = pigServer.openIterator("A");
assertTrue(iter.hasNext());
- assertEquals(5, iter.next().size());
+
+ Tuple t = iter.next();
+ //Check the tuple size. It has to be 3.
+ assertEquals(3, t.size());
+
+ while(iter.hasNext()){
+ t = iter.next();
+ }
+ // Last tuple's size has to be 5
+ // 3 datum (ex: 100:apple1:aaa)
+ // + PoissonSampleLoader.NUMROWS_TUPLE_MARKER ??_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah
+ // + numRow 300
+ assertEquals(5, t.size());
}
}
\ No newline at end of file