You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2012/12/05 00:54:32 UTC
svn commit: r1417240 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/io/TFileRecordReader.java
test/org/apache/pig/test/TestTmpFileCompression.java
Author: rohini
Date: Tue Dec 4 23:54:31 2012
New Revision: 1417240
URL: http://svn.apache.org/viewvc?rev=1417240&view=rev
Log:
PIG-3072: Pig job reporting negative progress (knoguchi via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1417240&r1=1417239&r2=1417240&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Dec 4 23:54:31 2012
@@ -58,6 +58,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3072: Pig job reporting negative progress (knoguchi via rohini)
+
PIG-3014: CurrentTime() UDF has undesirable characteristics (jcoveney via cheolsoo)
PIG-2924: PigStats should not be assuming all Storage classes to be file-based storage (cheolsoo)
Modified: pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java?rev=1417240&r1=1417239&r2=1417240&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java Tue Dec 4 23:54:31 2012
@@ -43,7 +43,6 @@ import org.apache.hadoop.io.file.tfile.T
public class TFileRecordReader extends RecordReader<Text, Tuple> {
private long start;
- private long pos;
private long end;
Reader reader = null;
Reader.Scanner scanner = null;
@@ -102,12 +101,15 @@ public class TFileRecordReader extends R
/**
* Get the progress within the split
*/
- public float getProgress() {
+ @Override
+ public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
}
else {
- return Math.min(1.0f, (pos - start) / (float) (end - start));
+ //TFile.Reader reads into buffer so progress is updated in chunks.
+ return Math.min(1.0f,
+ (fileIn.getPos() - start) / (float) (end - start));
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java?rev=1417240&r1=1417239&r2=1417240&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java Tue Dec 4 23:54:31 2012
@@ -31,11 +31,20 @@ import java.io.BufferedReader;
import java.io.FileReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.ExecType;
import org.apache.pig.PigRunner;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.TFileRecordReader;
+import org.apache.pig.impl.io.TFileRecordWriter;
import org.apache.pig.impl.io.TFileStorage;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.tools.pigstats.OutputStats;
@@ -327,4 +336,65 @@ public class TestTmpFileCompression {
new File("1.txt").delete();
}
}
+
+ @Test
+ public void testTFileRecordWriterReaderAndProgress() throws Exception {
+ // Create a small tFile by pig tfilewriter, read by tfilereader and
+ // make sure that data matches,
+ // progress is above zero and increasing
+ File tFile = File.createTempFile("test", "tfile");
+ Path basicTFile = new Path(tFile.getAbsolutePath());
+ //delete the empty file and let TFileRecordWriter create it again.
+ tFile.delete();
+ Configuration conf = new Configuration();
+ conf.set("tfile.io.chunk.size","100");
+ conf.set("fs.default.name", "file:///");
+
+ for (String codec: new String [] {"none", "gz"} ) {
+ System.err.println("Testing RecordWriter/Reader with codec: "
+ + codec);
+ try {
+ TFileRecordWriter writer = new TFileRecordWriter(basicTFile,
+ codec, conf);
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ int LOOP_SIZE = 25000;
+ for( int i=0; i <= LOOP_SIZE; i++) {
+ String key = String.format("%010d",i);
+ tuple.set(0,key);
+ writer.write(null, tuple);
+ }
+ writer.close(null);
+ int size = (int) tFile.length();
+ FileSplit split = new FileSplit(basicTFile, 0, size, null);
+ TFileRecordReader reader = new TFileRecordReader();
+ reader.initialize(split,
+ HadoopShims.createTaskAttemptContext(
+ conf,
+ HadoopShims.createTaskAttemptID("jt", 1, true, 1, 1)));
+
+ float progress = 0, lastprogress = 0;
+ int curval = 0, prevval = -1;
+ while (reader.nextKeyValue()) {
+ Tuple t = (Tuple) reader.getCurrentValue();
+ curval = Integer.valueOf((String) t.get(0));
+ assertEquals("Unexpected Value", curval, prevval + 1);
+ prevval = curval;
+
+ progress = reader.getProgress();
+ if( progress != lastprogress ) {
+ System.err.println("progress: " + progress);
+ }
+ assertTrue("Progress is not positive", progress > 0);
+ assertTrue("Progress is not increasing",
+ progress >= lastprogress);
+ lastprogress = progress;
+ }
+ assertEquals("Last value does not match",
+ curval, LOOP_SIZE );
+ } finally {
+ tFile.delete();
+ }
+ }
+ }
}