You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/07/07 19:26:13 UTC

svn commit: r791916 - in /hadoop/pig/trunk/src/org/apache/pig: SamplableLoader.java builtin/BinStorage.java builtin/PigStorage.java impl/builtin/RandomSampleLoader.java

Author: pradeepkth
Date: Tue Jul  7 17:26:13 2009
New Revision: 791916

URL: http://svn.apache.org/viewvc?rev=791916&view=rev
Log:
PIG-820: additional fixes to original patch (Ashutosh Chauhan via pradeepkth)

Modified:
    hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java

Modified: hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java?rev=791916&r1=791915&r2=791916&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java Tue Jul  7 17:26:13 2009
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 
+import org.apache.pig.data.Tuple;
+
 /**
  * Implementing this interface indicates to Pig that a given loader can be 
  * used by a sampling loader.  The requirement for this is that the loader
@@ -46,4 +48,16 @@
      * @return position in the stream.
      */
     public long getPosition() throws IOException;
+    
+    /**
+     * Get the next tuple from the stream starting from the current 
+     * read position.
+     * The loader implementation should not assume that current read position 
+     * in the stream is at the beginning of a record since this method is called
+     * for sampling and the current read position in the stream could be anywhere
+     * in the stream. 
+     * @return the next tuple from underlying input stream or null if there are no more tuples
+     * to be processed.
+     */
+    public Tuple getSampledTuple() throws IOException;
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=791916&r1=791915&r2=791916&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Jul  7 17:26:13 2009
@@ -414,4 +414,9 @@
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public Tuple getSampledTuple() throws IOException {
+        return this.getNext();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=791916&r1=791915&r2=791916&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Jul  7 17:26:13 2009
@@ -59,6 +59,7 @@
     private static final int OS_UNIX = 0;
     private static final int OS_WINDOWS = 1;
     private static final String UTF8 = "UTF-8";
+    private Byte prevByte = null;
     
     public PigStorage() {
         os = OS_UNIX;
@@ -104,7 +105,13 @@
 
     @Override
     public long skip(long n) throws IOException {
-        return in.skip(n);
+        
+        long skipped = in.skip(n-1);
+        prevByte = (byte)in.read();
+        if(prevByte == -1) // End of stream.
+            return skipped;
+        else
+            return skipped+1;
     }
 
     public Tuple getNext() throws IOException {
@@ -136,6 +143,20 @@
         }
     }
 
+    @Override
+    public Tuple getSampledTuple() throws IOException {
+       
+        if(prevByte == null || prevByte == recordDel) 
+            // prevByte = null when this is called for the first time, in that case bindTo would have already
+            // called getNext() if it was required.
+        return getNext();
+        
+        else{   // We are in middle of record. So, we skip this and return the next one.
+            getNext();
+            return getNext();            
+        }
+    }
+
     public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
         this.in = in;
         this.end = end;
@@ -341,5 +362,4 @@
         // TODO Auto-generated method stub
         return null;
     }
-
-}
+ }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=791916&r1=791915&r2=791916&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java Tue Jul  7 17:26:13 2009
@@ -64,7 +64,7 @@
     @Override
     public Tuple getNext() throws IOException {
         long initialPos = loader.getPosition();
-        Tuple t = loader.getNext();
+        Tuple t = loader.getSampledTuple();
         long finalPos = loader.getPosition();
         
         long toSkip = skipInterval - (finalPos - initialPos);