You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2012/12/05 00:54:32 UTC

svn commit: r1417240 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/io/TFileRecordReader.java test/org/apache/pig/test/TestTmpFileCompression.java

Author: rohini
Date: Tue Dec  4 23:54:31 2012
New Revision: 1417240

URL: http://svn.apache.org/viewvc?rev=1417240&view=rev
Log:
PIG-3072: Pig job reporting negative progress (knoguchi via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
    pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1417240&r1=1417239&r2=1417240&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Dec  4 23:54:31 2012
@@ -58,6 +58,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3072: Pig job reporting negative progress (knoguchi via rohini)
+
 PIG-3014: CurrentTime() UDF has undesirable characteristics (jcoveney via cheolsoo)
 
 PIG-2924: PigStats should not be assuming all Storage classes to be file-based storage (cheolsoo)

Modified: pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java?rev=1417240&r1=1417239&r2=1417240&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java Tue Dec  4 23:54:31 2012
@@ -43,7 +43,6 @@ import org.apache.hadoop.io.file.tfile.T
 public class TFileRecordReader extends RecordReader<Text, Tuple> {
 
     private long start;
-    private long pos;
     private long end;
     Reader reader = null;
     Reader.Scanner scanner = null;
@@ -102,12 +101,15 @@ public class TFileRecordReader extends R
     /**
      * Get the progress within the split
      */
-    public float getProgress() {
+    @Override
+    public float getProgress() throws IOException {
         if (start == end) {
             return 0.0f;
         }
         else {
-            return Math.min(1.0f, (pos - start) / (float) (end - start));
+            //TFile.Reader reads into buffer so progress is updated in chunks.
+            return Math.min(1.0f,
+                      (fileIn.getPos() - start) / (float) (end - start));
         }
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java?rev=1417240&r1=1417239&r2=1417240&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java Tue Dec  4 23:54:31 2012
@@ -31,11 +31,20 @@ import java.io.BufferedReader;
 import java.io.FileReader;
 
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.TFileRecordReader;
+import org.apache.pig.impl.io.TFileRecordWriter;
 import org.apache.pig.impl.io.TFileStorage;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.tools.pigstats.OutputStats;
@@ -327,4 +336,65 @@ public class TestTmpFileCompression {
             new File("1.txt").delete(); 
         }
     }
+
+    @Test
+    public void testTFileRecordWriterReaderAndProgress() throws Exception {
+        // Create a small tFile by pig tfilewriter, read by tfilereader and
+        // make sure that data matches,
+        // progress is above zero and increasing
+        File tFile = File.createTempFile("test", "tfile");
+        Path basicTFile = new Path(tFile.getAbsolutePath());
+        //delete the empty file and let TFileRecordWriter create it again.
+        tFile.delete();
+        Configuration conf = new Configuration();
+        conf.set("tfile.io.chunk.size","100");
+        conf.set("fs.default.name", "file:///");
+
+        for (String codec: new String [] {"none", "gz"} ) {
+            System.err.println("Testing RecordWriter/Reader with codec: "
+                               + codec);
+            try {
+                TFileRecordWriter writer = new TFileRecordWriter(basicTFile,
+                                                                 codec, conf);
+
+                Tuple tuple = TupleFactory.getInstance().newTuple(1);
+                int LOOP_SIZE = 25000;
+                for( int i=0; i <= LOOP_SIZE; i++) {
+                    String key = String.format("%010d",i);
+                    tuple.set(0,key);
+                    writer.write(null, tuple);
+                }
+                writer.close(null);
+                int size = (int) tFile.length();
+                FileSplit split = new FileSplit(basicTFile, 0, size, null);
+                TFileRecordReader reader = new TFileRecordReader();
+                reader.initialize(split,
+                    HadoopShims.createTaskAttemptContext(
+                        conf,
+                        HadoopShims.createTaskAttemptID("jt", 1, true, 1, 1)));
+
+                float progress = 0,  lastprogress = 0;
+                int curval = 0, prevval = -1;
+                while (reader.nextKeyValue()) {
+                    Tuple t = (Tuple) reader.getCurrentValue();
+                    curval = Integer.valueOf((String) t.get(0));
+                    assertEquals("Unexpected Value", curval, prevval + 1);
+                    prevval = curval;
+
+                    progress = reader.getProgress();
+                    if( progress != lastprogress ) {
+                        System.err.println("progress: " + progress);
+                    }
+                    assertTrue("Progress is not positive", progress > 0);
+                    assertTrue("Progress is not increasing",
+                               progress >= lastprogress);
+                    lastprogress = progress;
+                }
+                assertEquals("Last value does not match",
+                            curval, LOOP_SIZE );
+            } finally {
+                tFile.delete();
+            }
+        }
+    }
 }