You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/12 08:59:45 UTC

svn commit: r1678876 - in /pig/branches/branch-0.15: CHANGES.txt lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java test/org/apache/pig/test/TestBZip.java

Author: daijy
Date: Tue May 12 06:59:45 2015
New Revision: 1678876

URL: http://svn.apache.org/r1678876
Log:
PIG-4496: Fix CBZip2InputStream to close underlying stream

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1678876&r1=1678875&r2=1678876&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Tue May 12 06:59:45 2015
@@ -66,6 +66,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4496: Fix CBZip2InputStream to close underlying stream (petersla via daijy)
+
 PIG-4528: Fix a typo in src/docs/src/documentation/content/xdocs/basic.xml (namusyaka via daijy)
 
 PIG-4532: Pig Documentation contains typo for AvroStorage (fredericschmaljohann via daijy)

Modified: pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=1678876&r1=1678875&r2=1678876&view=diff
==============================================================================
--- pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original)
+++ pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Tue May 12 06:59:45 2015
@@ -223,6 +223,10 @@ public class CBZip2InputStream extends I
 
     @Override
     public int read() throws IOException {
+        if (this.innerBsStream == null) {
+            throw new IOException("stream closed");
+        }
+
         if (streamEnd) {
             return -1;
         } else {
@@ -264,6 +268,18 @@ public class CBZip2InputStream extends I
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        if (this.innerBsStream == null) {
+            return;
+        }
+        try {
+            innerBsStream.close();
+        } finally {
+            this.innerBsStream = null;
+        }
+    }
+
     /**
      * getPos is used by the caller to know when the processing of the current 
      * {@link InputSplit} is complete. In this method, as we read each bzip
@@ -291,7 +307,6 @@ public class CBZip2InputStream extends I
             magic4 = bsGetUChar();
             if (magic1 != 'B' || magic2 != 'Z' || 
                     magic3 != 'h' || magic4 < '1' || magic4 > '9') {
-                bsFinishedWithStream();
                 streamEnd = true;
                 return;
             }
@@ -308,7 +323,6 @@ public class CBZip2InputStream extends I
     
     private void initBlock(boolean searchForMagic) throws IOException {
         if (readCount >= readLimit) {
-            bsFinishedWithStream();
             streamEnd = true;
             return;
         }
@@ -408,7 +422,6 @@ public class CBZip2InputStream extends I
         	throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
         			+ "Loading of concatenated bz2 files is not supported");
         }
-        bsFinishedWithStream();
         streamEnd = true;
     }
 
@@ -424,14 +437,6 @@ public class CBZip2InputStream extends I
         cadvise("CRC error");
     }
 
-    private void bsFinishedWithStream() {
-        if (this.innerBsStream != null) {
-            if (this.innerBsStream != System.in) {
-                this.innerBsStream = null;
-            }
-        }
-    }
-
     private void bsSetStream(FSDataInputStream f) {
         innerBsStream = f;
         bsLive = 0;

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java?rev=1678876&r1=1678875&r2=1678876&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java Tue May 12 06:59:45 2015
@@ -45,16 +45,23 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.utils.CloseAwareFSDataInputStream;
+import org.apache.pig.test.utils.CloseAwareOutputStream;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class TestBZip {
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         cluster = MiniGenericCluster.buildCluster();
@@ -73,10 +80,9 @@ public class TestBZip {
     public void testBzipInPig() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".bz2");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.bz2");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
 
@@ -121,9 +127,6 @@ public class TestBZip {
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-
-        in.delete();
-        Util.deleteFile(cluster, clusterOutput);
     }
 
    /**
@@ -133,10 +136,9 @@ public class TestBZip {
     public void testBzipInPig2() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".bz2");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.bz2");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
 
@@ -181,9 +183,6 @@ public class TestBZip {
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-
-        in.delete();
-        out.delete();
     }
 
     //see PIG-2391
@@ -197,10 +196,9 @@ public class TestBZip {
         };
 
         // bzip compressed input
-        File in = File.createTempFile("junit", ".bz2");
+        File in = folder.newFile("junit-in.bz2");
         String compressedInputFileName = in.getAbsolutePath();
         String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
-        in.deleteOnExit();
 
         try {
             CBZip2OutputStream cos =
@@ -230,7 +228,6 @@ public class TestBZip {
                 it2.next();
             }
         } finally {
-            in.delete();
             Util.deleteFile(cluster, "intermediate.bz");
             Util.deleteFile(cluster, "final.bz");
         }
@@ -249,9 +246,8 @@ public class TestBZip {
         };
 
         // bzip compressed input
-        File in = File.createTempFile("junit", ".bz2");
+        File in = folder.newFile("junit-in.bz2");
         String compressedInputFileName = in.getAbsolutePath();
-        in.deleteOnExit();
         String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
@@ -291,7 +287,6 @@ public class TestBZip {
             assertFalse(it2.hasNext());
 
         } finally {
-            in.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
             Util.deleteFile(cluster, clusterCompressedFilePath);
         }
@@ -305,10 +300,9 @@ public class TestBZip {
      public void testEmptyBzipInPig() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".tmp");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.tmp");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());
 
@@ -336,10 +330,6 @@ public class TestBZip {
 
         pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';");
         pig.openIterator("B");
-
-        in.delete();
-        Util.deleteFile(cluster, clusterOutputFilePath);
-
     }
 
     /**
@@ -347,8 +337,7 @@ public class TestBZip {
      */
     @Test
     public void testEmptyBzip() throws Exception {
-        File tmp = File.createTempFile("junit", ".tmp");
-        tmp.deleteOnExit();
+        File tmp = folder.newFile("junit.tmp");
         CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
                 tmp));
         cos.close();
@@ -358,7 +347,25 @@ public class TestBZip {
                 fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length());
         assertEquals(-1, cis.read(new byte[100]));
         cis.close();
-        tmp.delete();
+    }
+
+    @Test
+    public void testInnerStreamGetsClosed() throws Exception {
+        File tmp = folder.newFile("junit.tmp");
+
+        CloseAwareOutputStream out = new CloseAwareOutputStream(new FileOutputStream(tmp));
+        CBZip2OutputStream cos = new CBZip2OutputStream(out);
+        assertFalse(out.isClosed());
+        cos.close();
+        assertTrue(out.isClosed());
+
+        FileSystem fs = FileSystem.getLocal(new Configuration(false));
+        Path path = new Path(tmp.getAbsolutePath());
+        CloseAwareFSDataInputStream in = new CloseAwareFSDataInputStream(fs.open(path));
+        CBZip2InputStream cis = new CBZip2InputStream(in, -1, tmp.length());
+        assertFalse(in.isClosed());
+        cis.close();
+        assertTrue(in.isClosed());
     }
 
     /**
@@ -556,14 +563,12 @@ public class TestBZip {
         };
 
         // bzip compressed input file1
-        File in1 = File.createTempFile("junit", ".bz2");
+        File in1 = folder.newFile("junit-in1.bz2");
         String compressedInputFileName1 = in1.getAbsolutePath();
-        in1.deleteOnExit();
 
         // file2
-        File in2 = File.createTempFile("junit", ".bz2");
+        File in2 = folder.newFile("junit-in2.bz2");
         String compressedInputFileName2 = in2.getAbsolutePath();
-        in1.deleteOnExit();
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
         Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);
@@ -614,8 +619,6 @@ public class TestBZip {
             assertFalse(it2.hasNext());
 
         } finally {
-            in1.delete();
-            in2.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
         }