You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/04/11 18:51:17 UTC
svn commit: r1091120 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/io/TFileRecordReader.java
test/org/apache/pig/test/TestTmpFileCompression.java
Author: rding
Date: Mon Apr 11 16:51:16 2011
New Revision: 1091120
URL: http://svn.apache.org/viewvc?rev=1091120&view=rev
Log:
PIG-1977: 'Stream closed' error while reading Pig temp files
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1091120&r1=1091119&r2=1091120&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 11 16:51:16 2011
@@ -142,6 +142,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-1977: "Stream closed" error while reading Pig temp files (results of intermediate jobs) (rding)
+
PIG-1963: in nested foreach, accumutive udf taking input from order-by does not get results in order (thejas)
PIG-1911: Infinite loop with accumulator function in nested foreach (thejas)
Modified: pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java?rev=1091120&r1=1091119&r2=1091120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java Mon Apr 11 16:51:16 2011
@@ -78,12 +78,13 @@ public class TFileRecordReader extends R
// if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
// sequence - lets now read the contents of the tuple
value = (Tuple) sedes.readDatum(in);
- scanner.advance();
- return true;
}
finally {
in.close();
}
+
+ scanner.advance();
+ return true;
}
@Override
Modified: pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java?rev=1091120&r1=1091119&r2=1091120&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java Mon Apr 11 16:51:16 2011
@@ -20,19 +20,26 @@ package org.apache.pig.test;
import static org.junit.Assert.*;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Properties;
import java.io.BufferedReader;
import java.io.FileReader;
import org.apache.pig.ExecType;
+import org.apache.pig.PigRunner;
import org.apache.pig.PigServer;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.TFileStorage;
import org.apache.pig.impl.io.InterStorage;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -267,4 +274,57 @@ public class TestTmpFileCompression {
}));
}
+
+ // PIG-1977
+ @Test
+ public void testTFileRecordReader() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter("1.txt"));
+ for (int i = 0; i < 30; i++) {
+ w.println("1\tthis is a test for compression of temp files");
+ }
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, "1.txt", "1.txt");
+
+ PrintWriter w1 = new PrintWriter(new FileWriter("tfile.pig"));
+ w1.println("A = load '1.txt' as (a0:int, a1:chararray);");
+ w1.println("B = group A by a0;");
+ w1.println("store B into 'tfile' using org.apache.pig.impl.io.TFileStorage();");
+ w1.close();
+
+ PrintWriter w2 = new PrintWriter(new FileWriter("tfile2.pig"));
+ w2.println("A = load 'tfile' using org.apache.pig.impl.io.TFileStorage() as (a:int, b:bag{(b0:int, b1:chararray)});");
+ w2.println("B = foreach A generate flatten($1);");
+ w2.println("store B into '2.txt';");
+ w2.close();
+
+ try {
+ String[] args = { "-Dpig.tmpfilecompression.codec=gz",
+ "-Dtfile.io.chunk.size=100", "tfile.pig" };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(stats.isSuccessful());
+
+ String[] args2 = { "-Dpig.tmpfilecompression.codec=gz",
+ "-Dtfile.io.chunk.size=100", "tfile2.pig" };
+ PigStats stats2 = PigRunner.run(args2, null);
+
+ assertTrue(stats2.isSuccessful());
+
+ OutputStats os = stats2.result("B");
+ Iterator<Tuple> iter = os.iterator();
+ int count = 0;
+ String expected = "(1,this is a test for compression of temp files)";
+ while (iter.hasNext()) {
+ count++;
+ assertEquals(expected, iter.next().toString());
+ }
+ assertEquals(30, count);
+
+ } finally {
+ new File("tfile.pig").delete();
+ new File("tfile2.pig").delete();
+ new File("1.txt").delete();
+ }
+ }
}