You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2012/08/16 22:13:38 UTC

svn commit: r1374030 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/builtin/PoissonSampleLoader.java test/org/apache/pig/test/TestPoissonSampleLoader.java

Author: thejas
Date: Thu Aug 16 20:13:37 2012
New Revision: 1374030

URL: http://svn.apache.org/viewvc?rev=1374030&view=rev
Log:
PIG-2662: skew join does not honor its config parameters (rajesh.balamohan via thejas)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1374030&r1=1374029&r2=1374030&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 16 20:13:37 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2662: skew join does not honor its config parameters (rajesh.balamohan via thejas)
+
 PIG-2871: Refactor signature for PigReducerEstimator (billgraham)
 
 PIG-2851: Add flag to ant to run tests with a debugger port (billgraham)

Modified: pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1374030&r1=1374029&r2=1374030&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Aug 16 20:13:37 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Properties;
 
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
@@ -171,36 +172,6 @@ public class PoissonSampleLoader extends
         return t;
     }
 
-    /**
-     * Computes the number of samples for the loader
-     * 
-     * @param inputs : Set to pig inputs
-     * @param pc : PigContext object
-     * 
-     */
-    @Override
-    public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
-            PigContext pc) throws ExecException {
-        Properties pcProps = pc.getProperties();
-
-        // % of memory available for the records
-        heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-        if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
-            try {
-                heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
-            }catch(NumberFormatException e) {
-                // ignore, use default value
-            }
-        }
-
-        try {
-            sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
-        } catch (NumberFormatException e) {
-            sampleRate = DEFAULT_SAMPLE_RATE;
-        }
-
-    }
-    
     @Override
     public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
         super.prepareToRead(reader, split);
@@ -210,9 +181,11 @@ public class PoissonSampleLoader extends
         skipInterval = -1;
         memToSkipPerSample = 0;
         numRowSplTupleReturned = false;
-        sampleRate = DEFAULT_SAMPLE_RATE;
-        heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
         newSample = null;
+
+        Configuration conf = split.getConf();
+        sampleRate = conf.getInt(SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+        heapPerc = conf.getFloat(PERC_MEM_AVAIL, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
     }
 
 }

Modified: pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java?rev=1374030&r1=1374029&r2=1374030&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java Thu Aug 16 20:13:37 2012
@@ -90,8 +90,9 @@ public class TestPoissonSampleLoader ext
         Util.deleteFile(cluster, INPUT_FILE1);
     }
 
-    @Test
-    public void testNumSamples() throws IOException {
+    private int testNumSamples(String memUsage, String sampleRate) throws IOException {
+        pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", memUsage);
+        pigServer.getPigContext().getProperties().setProperty("pig.sksampler.samplerate", sampleRate);
         pigServer.registerQuery("A = Load '"+INPUT_FILE1+"' Using PoissonSampleLoader('PigStorage()', '100');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
         int count = 0;
@@ -99,7 +100,29 @@ public class TestPoissonSampleLoader ext
             count++;
             iter.next();
         }
-        assertEquals(count, 1);
+        return count;
+    }
+
+    @Test
+    public void testNumSamples() throws IOException {
+        //PoissonSampleLoader.DEFAULT_SAMPLE_RATE is 17
+        int count = testNumSamples("0.0001", "17");
+        assertEquals(count, 12);
+
+        count = testNumSamples("0.001", "17");
+        assertEquals(count, 6);
+
+        count = testNumSamples("0.001", "10");
+        assertEquals(count, 4);
+
+        count = testNumSamples("0.001", "100");
+        assertEquals(count, 9);
+
+        count = testNumSamples("0.005", "17");
+        assertEquals(count, 2);
+
+        count = testNumSamples("0.0001", "100");
+        assertEquals(count, 42);
     }
 
     /*
@@ -110,7 +133,19 @@ public class TestPoissonSampleLoader ext
         pigServer.registerQuery("A = Load '"+INPUT_FILE1+"' Using PoissonSampleLoader('PigStorage(\\\\\\':\\\\\\')', '100');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
         assertTrue(iter.hasNext());
-        assertEquals(5, iter.next().size());
+        
+        Tuple t = iter.next();
+        //Check the tuple size. It has to be 3.
+        assertEquals(3, t.size());
+
+        while(iter.hasNext()){
+            t = iter.next();
+        }
+        // Last tuple's size has to be 5
+        // 3 datum  (ex: 100:apple1:aaa)
+        // + PoissonSampleLoader.NUMROWS_TUPLE_MARKER ??_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah 
+        // + numRow 300
+        assertEquals(5, t.size());
     }
 
 }
\ No newline at end of file