You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/04/11 18:52:12 UTC

svn commit: r1091123 - in /pig/branches/branch-0.8: CHANGES.txt src/org/apache/pig/impl/io/TFileRecordReader.java test/org/apache/pig/test/TestTmpFileCompression.java

Author: rding
Date: Mon Apr 11 16:52:11 2011
New Revision: 1091123

URL: http://svn.apache.org/viewvc?rev=1091123&view=rev
Log:
PIG-1977: 'Stream closed' error while reading Pig temp files

Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/src/org/apache/pig/impl/io/TFileRecordReader.java
    pig/branches/branch-0.8/test/org/apache/pig/test/TestTmpFileCompression.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1091123&r1=1091122&r2=1091123&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Mon Apr 11 16:52:11 2011
@@ -34,6 +34,8 @@ PIG-1886: Add zookeeper jar to list of j
 
 BUG FIXES
 
+PIG-1977: "Stream closed" error while reading Pig temp files (results of intermediate jobs) (rding)
+
 PIG-1911: Infinite loop with accumulator function in nested foreach (thejas)
 
 PIG-1964: PigStorageSchema fails if a column value is null (thejas)

Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/io/TFileRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/io/TFileRecordReader.java?rev=1091123&r1=1091122&r2=1091123&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/io/TFileRecordReader.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/io/TFileRecordReader.java Mon Apr 11 16:52:11 2011
@@ -78,12 +78,13 @@ public class TFileRecordReader extends R
             // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
             // sequence - lets now read the contents of the tuple 
             value = (Tuple) sedes.readDatum(in);
-            scanner.advance();
-            return true;
         }
         finally {
             in.close();
         }
+        
+        scanner.advance();
+        return true;
     }
 
     @Override

Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestTmpFileCompression.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestTmpFileCompression.java?rev=1091123&r1=1091122&r2=1091123&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestTmpFileCompression.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestTmpFileCompression.java Mon Apr 11 16:52:11 2011
@@ -20,19 +20,26 @@ package org.apache.pig.test;
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Properties;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
 
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigRunner;
 import org.apache.pig.PigServer;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.TFileStorage;
 import org.apache.pig.impl.io.InterStorage;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -267,4 +274,57 @@ public class TestTmpFileCompression {
         }));
     }
 
+    
+    // PIG-1977
+    @Test
+    public void testTFileRecordReader() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter("1.txt"));
+        for (int i = 0; i < 30; i++) {
+            w.println("1\tthis is a test for compression of temp files");
+        }
+        w.close();
+        
+        Util.copyFromLocalToCluster(cluster, "1.txt", "1.txt");
+        
+        PrintWriter w1 = new PrintWriter(new FileWriter("tfile.pig"));
+        w1.println("A = load '1.txt' as (a0:int, a1:chararray);");
+        w1.println("B = group A by a0;");
+        w1.println("store B into 'tfile' using org.apache.pig.impl.io.TFileStorage();");
+        w1.close();
+        
+        PrintWriter w2 = new PrintWriter(new FileWriter("tfile2.pig"));
+        w2.println("A = load 'tfile' using org.apache.pig.impl.io.TFileStorage() as (a:int, b:bag{(b0:int, b1:chararray)});");
+        w2.println("B = foreach A generate flatten($1);");
+        w2.println("store B into '2.txt';");
+        w2.close();
+        
+        try {
+            String[] args = { "-Dpig.tmpfilecompression.codec=gz",
+                    "-Dtfile.io.chunk.size=100", "tfile.pig" };
+            PigStats stats = PigRunner.run(args, null);
+     
+            assertTrue(stats.isSuccessful());
+ 
+            String[] args2 = { "-Dpig.tmpfilecompression.codec=gz",
+                    "-Dtfile.io.chunk.size=100", "tfile2.pig" };
+            PigStats stats2 = PigRunner.run(args2, null);
+
+            assertTrue(stats2.isSuccessful());
+            
+            OutputStats os = stats2.result("B");
+            Iterator<Tuple> iter = os.iterator();
+            int count = 0;
+            String expected = "(1,this is a test for compression of temp files)";
+            while (iter.hasNext()) {
+                count++;
+                assertEquals(expected, iter.next().toString());
+            }
+            assertEquals(30, count);
+            
+        } finally {
+            new File("tfile.pig").delete(); 
+            new File("tfile2.pig").delete(); 
+            new File("1.txt").delete(); 
+        }
+    }
 }