You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/05/07 01:37:20 UTC

[orc] 01/01: ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/orc.git

commit c43172e765e9575259d52e1655b495ea64c5c8b0
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon May 6 14:29:25 2019 -0700

    ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.
---
 java/core/src/java/org/apache/orc/Reader.java      |   3 +-
 .../org/apache/orc/impl/DataReaderProperties.java  |  21 +++
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |  34 ++++-
 .../java/org/apache/orc/impl/RecordReaderImpl.java |  10 +-
 .../org/apache/orc/impl/RecordReaderUtils.java     |  30 +++-
 .../test/org/apache/orc/impl/TestReaderImpl.java   | 153 +++++++++++++++++++++
 6 files changed, 239 insertions(+), 12 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 2ef64d7..3b41251 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -18,6 +18,7 @@
 
 package org.apache.orc;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  *
  * One Reader can support multiple concurrent RecordReader.
  */
-public interface Reader {
+public interface Reader extends Closeable {
 
   /**
    * Get the number of rows in the file.
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
index fbdc145..5bbb8a6 100644
--- a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -17,6 +17,7 @@
  */
 package org.apache.orc.impl;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.CompressionKind;
@@ -25,6 +26,8 @@ public final class DataReaderProperties {
 
   private final FileSystem fileSystem;
   private final Path path;
+  private final FSDataInputStream file;
+  private final ReaderImpl reader;
   private final CompressionKind compression;
   private final boolean zeroCopy;
   private final int typeCount;
@@ -33,6 +36,8 @@ public final class DataReaderProperties {
   private DataReaderProperties(Builder builder) {
     this.fileSystem = builder.fileSystem;
     this.path = builder.path;
+    this.reader = builder.reader;
+    this.file = builder.file;
     this.compression = builder.compression;
     this.zeroCopy = builder.zeroCopy;
     this.typeCount = builder.typeCount;
@@ -47,6 +52,14 @@ public final class DataReaderProperties {
     return path;
   }
 
+  public ReaderImpl getReader() {
+    return reader;
+  }
+
+  public FSDataInputStream getFile() {
+    return file;
+  }
+
   public CompressionKind getCompression() {
     return compression;
   }
@@ -71,6 +84,8 @@ public final class DataReaderProperties {
 
     private FileSystem fileSystem;
     private Path path;
+    private ReaderImpl reader;
+    private FSDataInputStream file;
     private CompressionKind compression;
     private boolean zeroCopy;
     private int typeCount;
@@ -90,6 +105,12 @@ public final class DataReaderProperties {
       return this;
     }
 
+    public Builder withFile(ReaderImpl reader, FSDataInputStream file) {
+      this.reader = reader;
+      this.file = file;
+      return this;
+    }
+
     public Builder withCompression(CompressionKind value) {
       this.compression = value;
       return this;
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 80b7a60..5159732 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -63,6 +63,8 @@ public class ReaderImpl implements Reader {
   private final long maxLength;
   protected final Path path;
   protected final org.apache.orc.CompressionKind compressionKind;
+  protected FSDataInputStream file;
+  protected int fileReferenceCount = 0;
   protected int bufferSize;
   protected OrcProto.Metadata metadata;
   private List<OrcProto.StripeStatistics> stripeStats;
@@ -519,7 +521,9 @@ public class ReaderImpl implements Reader {
     OrcProto.PostScript ps;
     OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
     long modificationTime;
-    try (FSDataInputStream file = fs.open(path)) {
+    file = fs.open(path);
+    addFileReference();
+    try {
       // figure out the size of the file using the option or filesystem
       long size;
       if (maxFileLength == Long.MAX_VALUE) {
@@ -536,9 +540,9 @@ public class ReaderImpl implements Reader {
         return buildEmptyTail();
       } else if (size <= OrcFile.MAGIC.length()) {
         // Anything smaller than MAGIC header cannot be valid (valid ORC files
-	// are actually around 40 bytes, this is more conservative)
+        // are actually around 40 bytes, this is more conservative)
         throw new FileFormatException("Not a valid ORC file " + path
-          + " (maxFileLength= " + maxFileLength + ")");
+                                          + " (maxFileLength= " + maxFileLength + ")");
       }
       fileTailBuilder.setFileLength(size);
 
@@ -598,6 +602,14 @@ public class ReaderImpl implements Reader {
         OrcCodecPool.returnCodec(compressionKind, codec);
       }
       fileTailBuilder.setFooter(footer);
+    } catch (Throwable thr) {
+      try {
+        removeFileReference();
+      } catch (IOException ignore) {
+        // ignore
+      }
+      throw thr instanceof IOException ? (IOException) thr :
+                new IOException("Problem reading file footer " + path, thr);
     }
 
     ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining());
@@ -823,4 +835,20 @@ public class ReaderImpl implements Reader {
     buffer.append(")");
     return buffer.toString();
   }
+
+  void addFileReference() {
+    fileReferenceCount += 1;
+  }
+
+  void removeFileReference() throws IOException {
+    fileReferenceCount -= 1;
+    if (fileReferenceCount == 0 && file != null) {
+      file.close();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    removeFileReference();
+  }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 0dacc70..83b987d 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -241,15 +241,19 @@ public class RecordReaderImpl implements RecordReader {
     if (options.getDataReader() != null) {
       this.dataReader = options.getDataReader().clone();
     } else {
-      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+      DataReaderProperties.Builder builder =
           DataReaderProperties.builder()
               .withBufferSize(bufferSize)
               .withCompression(fileReader.compressionKind)
               .withFileSystem(fileReader.fileSystem)
               .withPath(fileReader.path)
               .withTypeCount(types.size())
-              .withZeroCopy(zeroCopy)
-              .build());
+              .withZeroCopy(zeroCopy);
+      if (fileReader.file != null) {
+        builder.withFile(fileReader, fileReader.file);
+      }
+      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+          builder.build());
     }
     this.dataReader.open();
     firstRow = skippedRows;
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 6006634..3d6699b 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -142,20 +142,24 @@ public class RecordReaderUtils {
   }
 
   private static class DefaultDataReader implements DataReader {
-    private FSDataInputStream file = null;
     private final ByteBufferAllocatorPool pool;
     private HadoopShims.ZeroCopyReaderShim zcr = null;
     private final FileSystem fs;
     private final Path path;
+    private final ReaderImpl reader;
+    private FSDataInputStream file;
     private final boolean useZeroCopy;
     private final CompressionCodec codec;
     private final int bufferSize;
     private final int typeCount;
     private CompressionKind compressionKind;
+    private boolean isOpen = false;
 
     private DefaultDataReader(DataReaderProperties properties) {
       this.fs = properties.getFileSystem();
       this.path = properties.getPath();
+      this.reader = properties.getReader();
+      this.file = properties.getFile();
       this.useZeroCopy = properties.getZeroCopy();
       this.compressionKind = properties.getCompression();
       this.codec = OrcCodecPool.getCodec(compressionKind);
@@ -170,12 +174,17 @@ public class RecordReaderUtils {
 
     @Override
     public void open() throws IOException {
-      this.file = fs.open(path);
+      if (file == null) {
+        this.file = fs.open(path);
+      } else if (reader != null) {
+        reader.addFileReference();
+      }
       if (useZeroCopy) {
         zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
       } else {
         zcr = null;
       }
+      isOpen = true;
     }
 
     @Override
@@ -190,7 +199,7 @@ public class RecordReaderUtils {
                                  OrcProto.Stream.Kind[] bloomFilterKinds,
                                  OrcProto.BloomFilterIndex[] bloomFilterIndices
                                  ) throws IOException {
-      if (file == null) {
+      if (!isOpen) {
         open();
       }
       if (footer == null) {
@@ -258,7 +267,7 @@ public class RecordReaderUtils {
 
     @Override
     public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-      if (file == null) {
+      if (!isOpen) {
         open();
       }
       long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
@@ -289,7 +298,11 @@ public class RecordReaderUtils {
       // close both zcr and file
       try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
         if (file != null) {
-          file.close();
+          if (reader == null) {
+            file.close();
+          } else {
+            reader.removeFileReference();
+          }
         }
       }
     }
@@ -307,6 +320,13 @@ public class RecordReaderUtils {
     @Override
     public DataReader clone() {
       try {
+        if (file != null) {
+          if (reader != null) {
+            reader.addFileReference();
+          } else {
+            throw new IllegalStateException("Can't clone open data reader");
+          }
+        }
         return (DataReader) super.clone();
       } catch (CloneNotSupportedException e) {
         throw new UnsupportedOperationException("uncloneable", e);
diff --git a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
index 23cdc89..7cc50a0 100644
--- a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
@@ -17,15 +17,26 @@ package org.apache.orc.impl;
 
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
 import org.apache.orc.FileFormatException;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcFile;
@@ -168,4 +179,146 @@ public class TestReaderImpl {
       readFully(position, buffer, 0, buffer.length);
     }
   }
+
+  static class MockInputStream extends FSDataInputStream {
+    MockFileSystem fs;
+
+    public MockInputStream(MockFileSystem fs) {
+      super(new SeekableByteArrayInputStream(new byte[0]));
+      this.fs = fs;
+    }
+
+    public void close() {
+      fs.removeStream(this);
+    }
+  }
+
+  static class MockFileSystem extends FileSystem {
+    final List<MockInputStream> streams = new ArrayList<>();
+
+    public MockFileSystem(Configuration conf) {
+      setConf(conf);
+    }
+
+    @Override
+    public URI getUri() {
+      try {
+        return new URI("mock:///");
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("bad uri", e);
+      }
+    }
+
+    @Override
+    public FSDataInputStream open(Path path, int i) {
+      MockInputStream result = new MockInputStream(this);
+      streams.add(result);
+      return result;
+    }
+
+    void removeStream(MockInputStream stream) {
+      streams.remove(stream);
+    }
+
+    int streamCount() {
+      return streams.size();
+    }
+
+    @Override
+    public FSDataOutputStream create(Path path, FsPermission fsPermission,
+                                     boolean b, int i, short i1, long l,
+                                     Progressable progressable) throws IOException {
+      throw new IOException("Can't create");
+    }
+
+    @Override
+    public FSDataOutputStream append(Path path, int i,
+                                     Progressable progressable) throws IOException {
+      throw new IOException("Can't append");
+    }
+
+    @Override
+    public boolean rename(Path path, Path path1) {
+      return false;
+    }
+
+    @Override
+    public boolean delete(Path path, boolean b) {
+      return false;
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) {
+      return new FileStatus[0];
+    }
+
+    @Override
+    public void setWorkingDirectory(Path path) {
+      // ignore
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return new Path("/");
+    }
+
+    @Override
+    public boolean mkdirs(Path path, FsPermission fsPermission) {
+      return false;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) {
+      return new FileStatus();
+    }
+  }
+
+  @Test
+  public void testClosingRowsFirst() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(1, fs.streamCount());
+    RecordReader rows = reader.rows();
+    assertEquals(1, fs.streamCount());
+    RecordReader rows2 = reader.rows();
+    assertEquals(1, fs.streamCount());
+    rows.close();
+    assertEquals(1, fs.streamCount());
+    rows2.close();
+    assertEquals(1, fs.streamCount());
+    reader.close();
+    assertEquals(0, fs.streamCount());
+  }
+
+  @Test
+  public void testClosingReaderFirst() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(1, fs.streamCount());
+    RecordReader rows = reader.rows();
+    assertEquals(1, fs.streamCount());
+    reader.close();
+    assertEquals(1, fs.streamCount());
+    rows.close();
+    assertEquals(0, fs.streamCount());
+  }
+
+  @Test
+  public void testClosingMultiple() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    Reader reader2 = OrcFile.createReader(new Path("/bar"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(2, fs.streamCount());
+    reader.close();
+    assertEquals(1, fs.streamCount());
+    reader2.close();
+    assertEquals(0, fs.streamCount());
+  }
 }