You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/02/23 00:14:34 UTC

svn commit: r915106 - in /hadoop/hive/branches/branch-0.5: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java

Author: zshao
Date: Mon Feb 22 23:14:34 2010
New Revision: 915106

URL: http://svn.apache.org/viewvc?rev=915106&view=rev
Log:
HIVE-1185. Fix RCFile resource leak when opening a non-RCFile. (He Yongqiang via zshao)

Modified:
    hadoop/hive/branches/branch-0.5/CHANGES.txt
    hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java

Modified: hadoop/hive/branches/branch-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/CHANGES.txt?rev=915106&r1=915105&r2=915106&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.5/CHANGES.txt Mon Feb 22 23:14:34 2010
@@ -486,6 +486,9 @@
     HIVE-1183. hive.hwi.war.file vanished from hive-default.xml
     (Zheng Shao via namit)
 
+    HIVE-1185. Fix RCFile resource leak when opening a non-RCFile.
+    (He Yongqiang via zshao)
+
 Release 0.4.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=915106&r1=915105&r2=915106&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Mon Feb 22 23:14:34 2010
@@ -955,16 +955,33 @@
         long start, long length) throws IOException {
       conf.setInt("io.file.buffer.size", bufferSize);
       this.file = file;
-      in = fs.open(file, bufferSize);
+      in = openFile(fs, file, bufferSize, length);
       this.conf = conf;
       end = start + length;
-      if (start > 0) {
-        seek(0);
-        init();
-        seek(start);
-      } else {
-        init();
+      boolean succeed = false;
+      try {
+        if (start > 0) {
+          seek(0);
+          init();
+          seek(start);
+        } else {
+          init();
+        }
+        succeed = true;
+      } finally {
+        if (!succeed) {
+          if (in != null) {
+            try {
+              in.close();
+            } catch(IOException e) {
+              if (LOG != null && LOG.isDebugEnabled()) {
+                LOG.debug("Exception in closing " + in, e);
+              }
+            }
+          }
+        }
       }
+
       columnNumber = Integer.parseInt(metadata.get(
           new Text(COLUMN_NUMBER_METADATA_STR)).toString());
 
@@ -1018,6 +1035,15 @@
       currentKey = createKeyBuffer();
       currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec);
     }
+    
+    /**
+     * Override this method to specialize the type of
+     * {@link FSDataInputStream} returned.
+     */
+    protected FSDataInputStream openFile(FileSystem fs, Path file,
+        int bufferSize, long length) throws IOException {
+      return fs.open(file, bufferSize);
+    }
 
     private void init() throws IOException {
       byte[] versionBlock = new byte[VERSION.length];

Modified: hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=915106&r1=915105&r2=915106&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Mon Feb 22 23:14:34 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Properties;
@@ -28,7 +29,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
@@ -401,6 +404,54 @@
   private void splitAfterSync() throws IOException {
     writeThenReadByRecordReader(500, 1000, 2, 19950, null);
   }
+  
+
+  // adopted Hadoop-5476 (calling new SequenceFile.Reader(...) leaves an
+  // InputStream open, if the given sequence file is broken) to RCFile 
+  private static class TestFSDataInputStream extends FSDataInputStream {
+    private boolean closed = false;
+
+    private TestFSDataInputStream(InputStream in) throws IOException {
+      super(in);
+    }
+
+    public void close() throws IOException {
+      closed = true;
+      super.close();
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+  }
+
+  public void testCloseForErroneousRCFile() throws IOException {
+    Configuration conf = new Configuration();
+    LocalFileSystem fs = FileSystem.getLocal(conf);
+    // create an empty file (which is not a valid rcfile)
+    Path path = new Path(System.getProperty("test.build.data", ".")
+        + "/broken.rcfile");
+    fs.create(path).close();
+    // try to create RCFile.Reader
+    final TestFSDataInputStream[] openedFile = new TestFSDataInputStream[1];
+    try {
+      new RCFile.Reader(fs, path, conf) {
+        // this method is called by the RCFile.Reader constructor, overwritten,
+        // so we can access the opened file
+        protected FSDataInputStream openFile(FileSystem fs, Path file,
+            int bufferSize, long length) throws IOException {
+          final InputStream in = super.openFile(fs, file, bufferSize, length);
+          openedFile[0] = new TestFSDataInputStream(in);
+          return openedFile[0];
+        }
+      };
+      fail("IOException expected.");
+    } catch (IOException expected) {
+    }
+    assertNotNull(path + " should have been opened.", openedFile[0]);
+    assertTrue("InputStream for " + path + " should have been closed.",
+        openedFile[0].isClosed());
+  }
 
   private void writeThenReadByRecordReader(int intervalRecordCount,
       int writeCount, int splitNumber, long minSplitSize, CompressionCodec codec)