You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/12 08:59:45 UTC
svn commit: r1678876 - in /pig/branches/branch-0.15: CHANGES.txt
lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
test/org/apache/pig/test/TestBZip.java
Author: daijy
Date: Tue May 12 06:59:45 2015
New Revision: 1678876
URL: http://svn.apache.org/r1678876
Log:
PIG-4496: Fix CBZip2InputStream to close underlying stream
Modified:
pig/branches/branch-0.15/CHANGES.txt
pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java
Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1678876&r1=1678875&r2=1678876&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Tue May 12 06:59:45 2015
@@ -66,6 +66,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4496: Fix CBZip2InputStream to close underlying stream (petersla via daijy)
+
PIG-4528: Fix a typo in src/docs/src/documentation/content/xdocs/basic.xml (namusyaka via daijy)
PIG-4532: Pig Documentation contains typo for AvroStorage (fredericschmaljohann via daijy)
Modified: pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=1678876&r1=1678875&r2=1678876&view=diff
==============================================================================
--- pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original)
+++ pig/branches/branch-0.15/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Tue May 12 06:59:45 2015
@@ -223,6 +223,10 @@ public class CBZip2InputStream extends I
@Override
public int read() throws IOException {
+ if (this.innerBsStream == null) {
+ throw new IOException("stream closed");
+ }
+
if (streamEnd) {
return -1;
} else {
@@ -264,6 +268,18 @@ public class CBZip2InputStream extends I
}
}
+ @Override
+ public void close() throws IOException {
+ if (this.innerBsStream == null) {
+ return;
+ }
+ try {
+ innerBsStream.close();
+ } finally {
+ this.innerBsStream = null;
+ }
+ }
+
/**
* getPos is used by the caller to know when the processing of the current
* {@link InputSplit} is complete. In this method, as we read each bzip
@@ -291,7 +307,6 @@ public class CBZip2InputStream extends I
magic4 = bsGetUChar();
if (magic1 != 'B' || magic2 != 'Z' ||
magic3 != 'h' || magic4 < '1' || magic4 > '9') {
- bsFinishedWithStream();
streamEnd = true;
return;
}
@@ -308,7 +323,6 @@ public class CBZip2InputStream extends I
private void initBlock(boolean searchForMagic) throws IOException {
if (readCount >= readLimit) {
- bsFinishedWithStream();
streamEnd = true;
return;
}
@@ -408,7 +422,6 @@ public class CBZip2InputStream extends I
throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
+ "Loading of concatenated bz2 files is not supported");
}
- bsFinishedWithStream();
streamEnd = true;
}
@@ -424,14 +437,6 @@ public class CBZip2InputStream extends I
cadvise("CRC error");
}
- private void bsFinishedWithStream() {
- if (this.innerBsStream != null) {
- if (this.innerBsStream != System.in) {
- this.innerBsStream = null;
- }
- }
- }
-
private void bsSetStream(FSDataInputStream f) {
innerBsStream = f;
bsLive = 0;
Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java?rev=1678876&r1=1678875&r2=1678876&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestBZip.java Tue May 12 06:59:45 2015
@@ -45,16 +45,23 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.utils.CloseAwareFSDataInputStream;
+import org.apache.pig.test.utils.CloseAwareOutputStream;
import org.apache.tools.bzip2r.CBZip2InputStream;
import org.apache.tools.bzip2r.CBZip2OutputStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class TestBZip {
private static Properties properties;
private static MiniGenericCluster cluster;
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
@BeforeClass
public static void oneTimeSetUp() throws Exception {
cluster = MiniGenericCluster.buildCluster();
@@ -73,10 +80,9 @@ public class TestBZip {
public void testBzipInPig() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);
- File in = File.createTempFile("junit", ".bz2");
- in.deleteOnExit();
+ File in = folder.newFile("junit-in.bz2");
- File out = File.createTempFile("junit", ".bz2");
+ File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());
@@ -121,9 +127,6 @@ public class TestBZip {
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}
-
- in.delete();
- Util.deleteFile(cluster, clusterOutput);
}
/**
@@ -133,10 +136,9 @@ public class TestBZip {
public void testBzipInPig2() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);
- File in = File.createTempFile("junit", ".bz2");
- in.deleteOnExit();
+ File in = folder.newFile("junit-in.bz2");
- File out = File.createTempFile("junit", ".bz2");
+ File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());
@@ -181,9 +183,6 @@ public class TestBZip {
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}
-
- in.delete();
- out.delete();
}
//see PIG-2391
@@ -197,10 +196,9 @@ public class TestBZip {
};
// bzip compressed input
- File in = File.createTempFile("junit", ".bz2");
+ File in = folder.newFile("junit-in.bz2");
String compressedInputFileName = in.getAbsolutePath();
String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
- in.deleteOnExit();
try {
CBZip2OutputStream cos =
@@ -230,7 +228,6 @@ public class TestBZip {
it2.next();
}
} finally {
- in.delete();
Util.deleteFile(cluster, "intermediate.bz");
Util.deleteFile(cluster, "final.bz");
}
@@ -249,9 +246,8 @@ public class TestBZip {
};
// bzip compressed input
- File in = File.createTempFile("junit", ".bz2");
+ File in = folder.newFile("junit-in.bz2");
String compressedInputFileName = in.getAbsolutePath();
- in.deleteOnExit();
String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
@@ -291,7 +287,6 @@ public class TestBZip {
assertFalse(it2.hasNext());
} finally {
- in.delete();
Util.deleteFile(cluster, unCompressedInputFileName);
Util.deleteFile(cluster, clusterCompressedFilePath);
}
@@ -305,10 +300,9 @@ public class TestBZip {
public void testEmptyBzipInPig() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);
- File in = File.createTempFile("junit", ".tmp");
- in.deleteOnExit();
+ File in = folder.newFile("junit-in.tmp");
- File out = File.createTempFile("junit", ".bz2");
+ File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());
@@ -336,10 +330,6 @@ public class TestBZip {
pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';");
pig.openIterator("B");
-
- in.delete();
- Util.deleteFile(cluster, clusterOutputFilePath);
-
}
/**
@@ -347,8 +337,7 @@ public class TestBZip {
*/
@Test
public void testEmptyBzip() throws Exception {
- File tmp = File.createTempFile("junit", ".tmp");
- tmp.deleteOnExit();
+ File tmp = folder.newFile("junit.tmp");
CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
tmp));
cos.close();
@@ -358,7 +347,25 @@ public class TestBZip {
fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length());
assertEquals(-1, cis.read(new byte[100]));
cis.close();
- tmp.delete();
+ }
+
+ @Test
+ public void testInnerStreamGetsClosed() throws Exception {
+ File tmp = folder.newFile("junit.tmp");
+
+ CloseAwareOutputStream out = new CloseAwareOutputStream(new FileOutputStream(tmp));
+ CBZip2OutputStream cos = new CBZip2OutputStream(out);
+ assertFalse(out.isClosed());
+ cos.close();
+ assertTrue(out.isClosed());
+
+ FileSystem fs = FileSystem.getLocal(new Configuration(false));
+ Path path = new Path(tmp.getAbsolutePath());
+ CloseAwareFSDataInputStream in = new CloseAwareFSDataInputStream(fs.open(path));
+ CBZip2InputStream cis = new CBZip2InputStream(in, -1, tmp.length());
+ assertFalse(in.isClosed());
+ cis.close();
+ assertTrue(in.isClosed());
}
/**
@@ -556,14 +563,12 @@ public class TestBZip {
};
// bzip compressed input file1
- File in1 = File.createTempFile("junit", ".bz2");
+ File in1 = folder.newFile("junit-in1.bz2");
String compressedInputFileName1 = in1.getAbsolutePath();
- in1.deleteOnExit();
// file2
- File in2 = File.createTempFile("junit", ".bz2");
+ File in2 = folder.newFile("junit-in2.bz2");
String compressedInputFileName2 = in2.getAbsolutePath();
- in1.deleteOnExit();
String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);
@@ -614,8 +619,6 @@ public class TestBZip {
assertFalse(it2.hasNext());
} finally {
- in1.delete();
- in2.delete();
Util.deleteFile(cluster, unCompressedInputFileName);
}