You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/02/23 00:14:34 UTC
svn commit: r915106 - in /hadoop/hive/branches/branch-0.5: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
Author: zshao
Date: Mon Feb 22 23:14:34 2010
New Revision: 915106
URL: http://svn.apache.org/viewvc?rev=915106&view=rev
Log:
HIVE-1185. Fix RCFile resource leak when opening a non-RCFile. (He Yongqiang via zshao)
Modified:
hadoop/hive/branches/branch-0.5/CHANGES.txt
hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
Modified: hadoop/hive/branches/branch-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/CHANGES.txt?rev=915106&r1=915105&r2=915106&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.5/CHANGES.txt Mon Feb 22 23:14:34 2010
@@ -486,6 +486,9 @@
HIVE-1183. hive.hwi.war.file vanished from hive-default.xml
(Zheng Shao via namit)
+ HIVE-1185. Fix RCFile resource leak when opening a non-RCFile.
+ (He Yongqiang via zshao)
+
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=915106&r1=915105&r2=915106&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Mon Feb 22 23:14:34 2010
@@ -955,16 +955,33 @@
long start, long length) throws IOException {
conf.setInt("io.file.buffer.size", bufferSize);
this.file = file;
- in = fs.open(file, bufferSize);
+ in = openFile(fs, file, bufferSize, length);
this.conf = conf;
end = start + length;
- if (start > 0) {
- seek(0);
- init();
- seek(start);
- } else {
- init();
+ boolean succeed = false;
+ try {
+ if (start > 0) {
+ seek(0);
+ init();
+ seek(start);
+ } else {
+ init();
+ }
+ succeed = true;
+ } finally {
+ if (!succeed) {
+ if (in != null) {
+ try {
+ in.close();
+ } catch(IOException e) {
+ if (LOG != null && LOG.isDebugEnabled()) {
+ LOG.debug("Exception in closing " + in, e);
+ }
+ }
+ }
+ }
}
+
columnNumber = Integer.parseInt(metadata.get(
new Text(COLUMN_NUMBER_METADATA_STR)).toString());
@@ -1018,6 +1035,15 @@
currentKey = createKeyBuffer();
currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec);
}
+
+ /**
+ * Override this method to specialize the type of
+ * {@link FSDataInputStream} returned.
+ */
+ protected FSDataInputStream openFile(FileSystem fs, Path file,
+ int bufferSize, long length) throws IOException {
+ return fs.open(file, bufferSize);
+ }
private void init() throws IOException {
byte[] versionBlock = new byte[VERSION.length];
Modified: hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=915106&r1=915105&r2=915106&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Mon Feb 22 23:14:34 2010
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
@@ -28,7 +29,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
@@ -401,6 +404,54 @@
private void splitAfterSync() throws IOException {
writeThenReadByRecordReader(500, 1000, 2, 19950, null);
}
+
+
+ // adopted Hadoop-5476 (calling new SequenceFile.Reader(...) leaves an
+ // InputStream open, if the given sequence file is broken) to RCFile
+ private static class TestFSDataInputStream extends FSDataInputStream {
+ private boolean closed = false;
+
+ private TestFSDataInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ public void close() throws IOException {
+ closed = true;
+ super.close();
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+ }
+
+ public void testCloseForErroneousRCFile() throws IOException {
+ Configuration conf = new Configuration();
+ LocalFileSystem fs = FileSystem.getLocal(conf);
+ // create an empty file (which is not a valid rcfile)
+ Path path = new Path(System.getProperty("test.build.data", ".")
+ + "/broken.rcfile");
+ fs.create(path).close();
+ // try to create RCFile.Reader
+ final TestFSDataInputStream[] openedFile = new TestFSDataInputStream[1];
+ try {
+ new RCFile.Reader(fs, path, conf) {
+ // this method is called by the RCFile.Reader constructor, overwritten,
+ // so we can access the opened file
+ protected FSDataInputStream openFile(FileSystem fs, Path file,
+ int bufferSize, long length) throws IOException {
+ final InputStream in = super.openFile(fs, file, bufferSize, length);
+ openedFile[0] = new TestFSDataInputStream(in);
+ return openedFile[0];
+ }
+ };
+ fail("IOException expected.");
+ } catch (IOException expected) {
+ }
+ assertNotNull(path + " should have been opened.", openedFile[0]);
+ assertTrue("InputStream for " + path + " should have been closed.",
+ openedFile[0].isClosed());
+ }
private void writeThenReadByRecordReader(int intervalRecordCount,
int writeCount, int splitNumber, long minSplitSize, CompressionCodec codec)