You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/05/07 02:59:15 UTC

[orc] branch master updated: ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 608064f  ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.
608064f is described below

commit 608064f59ef2cb4a825667afe4a420f550d2b193
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon May 6 14:29:25 2019 -0700

    ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.
    
    Fixes #391
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 java/core/src/java/org/apache/orc/Reader.java      |   3 +-
 .../org/apache/orc/impl/DataReaderProperties.java  |  13 ++
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |  35 ++++-
 .../java/org/apache/orc/impl/RecordReaderImpl.java |  12 +-
 .../org/apache/orc/impl/RecordReaderUtils.java     |  15 +-
 .../test/org/apache/orc/impl/TestReaderImpl.java   | 172 +++++++++++++++++++++
 6 files changed, 237 insertions(+), 13 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index a90b6c4..6d6e04b 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -18,6 +18,7 @@
 
 package org.apache.orc;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  *
  * One Reader can support multiple concurrent RecordReader.
  */
-public interface Reader {
+public interface Reader extends Closeable {
 
   /**
    * Get the number of rows in the file.
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
index 8420149..386324c 100644
--- a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -17,6 +17,7 @@
  */
 package org.apache.orc.impl;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.CompressionKind;
@@ -26,6 +27,7 @@ public final class DataReaderProperties {
 
   private final FileSystem fileSystem;
   private final Path path;
+  private final FSDataInputStream file;
   private final CompressionKind compression;
   private final boolean zeroCopy;
   private final int typeCount;
@@ -35,6 +37,7 @@ public final class DataReaderProperties {
   private DataReaderProperties(Builder builder) {
     this.fileSystem = builder.fileSystem;
     this.path = builder.path;
+    this.file = builder.file;
     this.compression = builder.compression;
     this.zeroCopy = builder.zeroCopy;
     this.typeCount = builder.typeCount;
@@ -50,6 +53,10 @@ public final class DataReaderProperties {
     return path;
   }
 
+  public FSDataInputStream getFile() {
+    return file;
+  }
+
   public CompressionKind getCompression() {
     return compression;
   }
@@ -78,6 +85,7 @@ public final class DataReaderProperties {
 
     private FileSystem fileSystem;
     private Path path;
+    private FSDataInputStream file;
     private CompressionKind compression;
     private boolean zeroCopy;
     private int typeCount;
@@ -98,6 +106,11 @@ public final class DataReaderProperties {
       return this;
     }
 
+    public Builder withFile(FSDataInputStream file) {
+      this.file = file;
+      return this;
+    }
+
     public Builder withCompression(CompressionKind value) {
       this.compression = value;
       return this;
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index a7c8aed..79ed82b 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -63,6 +63,7 @@ public class ReaderImpl implements Reader {
   protected final Path path;
   protected final OrcFile.ReaderOptions options;
   protected final org.apache.orc.CompressionKind compressionKind;
+  protected FSDataInputStream file;
   protected int bufferSize;
   protected OrcProto.Metadata metadata;
   private List<OrcProto.StripeStatistics> stripeStats;
@@ -464,7 +465,8 @@ public class ReaderImpl implements Reader {
     OrcProto.PostScript ps;
     OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
     long modificationTime;
-    try (FSDataInputStream file = fs.open(path)) {
+    file = fs.open(path);
+    try {
       // figure out the size of the file using the option or filesystem
       long size;
       if (maxFileLength == Long.MAX_VALUE) {
@@ -481,9 +483,9 @@ public class ReaderImpl implements Reader {
         return buildEmptyTail();
       } else if (size <= OrcFile.MAGIC.length()) {
         // Anything smaller than MAGIC header cannot be valid (valid ORC files
-	// are actually around 40 bytes, this is more conservative)
+        // are actually around 40 bytes, this is more conservative)
         throw new FileFormatException("Not a valid ORC file " + path
-          + " (maxFileLength= " + maxFileLength + ")");
+                                          + " (maxFileLength= " + maxFileLength + ")");
       }
       fileTailBuilder.setFileLength(size);
 
@@ -544,6 +546,14 @@ public class ReaderImpl implements Reader {
         OrcCodecPool.returnCodec(compressionKind, codec);
       }
       fileTailBuilder.setFooter(footer);
+    } catch (Throwable thr) {
+      try {
+        close();
+      } catch (IOException ignore) {
+        LOG.info("Ignoring secondary exception in close of " + path, ignore);
+      }
+      throw thr instanceof IOException ? (IOException) thr :
+                new IOException("Problem reading file footer " + path, thr);
     }
 
     ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining());
@@ -766,4 +776,23 @@ public class ReaderImpl implements Reader {
     buffer.append(")");
     return buffer.toString();
   }
+
+  @Override
+  public void close() throws IOException {
+    if (file != null) {
+      file.close();
+    }
+  }
+
+  /**
+   * Take the file from the reader.
+   * This allows the first RecordReader to use the same file, but additional
+   * RecordReaders will open new handles.
+   * @return a file handle, if one is available
+   */
+  FSDataInputStream takeFile() {
+    FSDataInputStream result = file;
+    file = null;
+    return result;
+  }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 9b84e77..fc17fc3 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.orc.impl;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
@@ -236,16 +237,21 @@ public class RecordReaderImpl implements RecordReader {
     if (options.getDataReader() != null) {
       this.dataReader = options.getDataReader().clone();
     } else {
-      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+      DataReaderProperties.Builder builder =
           DataReaderProperties.builder()
               .withBufferSize(bufferSize)
               .withCompression(fileReader.compressionKind)
               .withFileSystem(fileReader.getFileSystem())
               .withPath(fileReader.path)
               .withTypeCount(types.size())
-              .withZeroCopy(zeroCopy)
               .withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
-              .build());
+              .withZeroCopy(zeroCopy);
+      FSDataInputStream file = fileReader.takeFile();
+      if (file != null) {
+        builder.withFile(file);
+      }
+      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+          builder.build());
     }
     firstRow = skippedRows;
     totalRowCount = rows;
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index a089e39..d4c57e5 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -30,7 +29,6 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
@@ -144,7 +142,7 @@ public class RecordReaderUtils {
   }
 
   private static class DefaultDataReader implements DataReader {
-    private FSDataInputStream file = null;
+    private FSDataInputStream file;
     private ByteBufferAllocatorPool pool;
     private HadoopShims.ZeroCopyReaderShim zcr = null;
     private final FileSystem fs;
@@ -154,10 +152,12 @@ public class RecordReaderUtils {
     private final int typeCount;
     private CompressionKind compressionKind;
     private final int maxDiskRangeChunkLimit;
+    private boolean isOpen = false;
 
     private DefaultDataReader(DataReaderProperties properties) {
       this.fs = properties.getFileSystem();
       this.path = properties.getPath();
+      this.file = properties.getFile();
       this.useZeroCopy = properties.getZeroCopy();
       this.compressionKind = properties.getCompression();
       options.withCodec(OrcCodecPool.getCodec(compressionKind))
@@ -168,7 +168,9 @@ public class RecordReaderUtils {
 
     @Override
     public void open() throws IOException {
-      this.file = fs.open(path);
+      if (file == null) {
+        this.file = fs.open(path);
+      }
       if (useZeroCopy) {
         // ZCR only uses codec for boolean checks.
         pool = new ByteBufferAllocatorPool();
@@ -176,6 +178,7 @@ public class RecordReaderUtils {
       } else {
         zcr = null;
       }
+      isOpen = true;
     }
 
     @Override
@@ -190,7 +193,7 @@ public class RecordReaderUtils {
                                  OrcProto.Stream.Kind[] bloomFilterKinds,
                                  OrcProto.BloomFilterIndex[] bloomFilterIndices
                                  ) throws IOException {
-      if (file == null) {
+      if (!isOpen) {
         open();
       }
       if (footer == null) {
@@ -257,7 +260,7 @@ public class RecordReaderUtils {
 
     @Override
     public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-      if (file == null) {
+      if (!isOpen) {
         open();
       }
       long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
diff --git a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
index 63cdefd..8471a1e 100644
--- a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
@@ -17,15 +17,26 @@ package org.apache.orc.impl;
 
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
 import org.apache.orc.FileFormatException;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcFile;
@@ -168,4 +179,165 @@ public class TestReaderImpl {
       readFully(position, buffer, 0, buffer.length);
     }
   }
+
+  static byte[] byteArray(int... input) {
+    byte[] result = new byte[input.length];
+    for(int i=0; i < result.length; ++i) {
+      result[i] = (byte) input[i];
+    }
+    return result;
+  }
+
+  static class MockInputStream extends FSDataInputStream {
+    MockFileSystem fs;
+    // A single row ORC file
+    static final byte[] SIMPLE_ORC = byteArray(
+        0x4f, 0x52, 0x43, 0x42, 0x00, 0x80, 0x0a, 0x06, 0x08, 0x01, 0x10, 0x01, 0x18, 0x03, 0x12, 0x02,
+        0x08, 0x00, 0x12, 0x02, 0x08, 0x02, 0x0a, 0x12, 0x0a, 0x04, 0x08, 0x00, 0x50, 0x00, 0x0a, 0x0a,
+        0x08, 0x00, 0x12, 0x02, 0x18, 0x00, 0x50, 0x00, 0x58, 0x03, 0x08, 0x03, 0x10, 0x16, 0x1a, 0x0a,
+        0x08, 0x03, 0x10, 0x00, 0x18, 0x03, 0x20, 0x10, 0x28, 0x01, 0x22, 0x08, 0x08, 0x0c, 0x12, 0x01,
+        0x01, 0x1a, 0x01, 0x78, 0x22, 0x02, 0x08, 0x03, 0x30, 0x01, 0x3a, 0x04, 0x08, 0x00, 0x50, 0x00,
+        0x3a, 0x0a, 0x08, 0x00, 0x12, 0x02, 0x18, 0x00, 0x50, 0x00, 0x58, 0x03, 0x40, 0x00, 0x48, 0x00,
+        0x08, 0x36, 0x10, 0x00, 0x22, 0x02, 0x00, 0x0c, 0x28, 0x14, 0x30, 0x07, 0x82, 0xf4, 0x03, 0x03,
+        0x4f, 0x52, 0x43, 0x13);
+
+    public MockInputStream(MockFileSystem fs) throws IOException {
+      super(new SeekableByteArrayInputStream(SIMPLE_ORC));
+      this.fs = fs;
+    }
+
+    public void close() {
+      fs.removeStream(this);
+    }
+  }
+
+  static class MockFileSystem extends FileSystem {
+    final List<MockInputStream> streams = new ArrayList<>();
+
+    public MockFileSystem(Configuration conf) {
+      setConf(conf);
+    }
+
+    @Override
+    public URI getUri() {
+      try {
+        return new URI("mock:///");
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("bad uri", e);
+      }
+    }
+
+    @Override
+    public FSDataInputStream open(Path path, int i) throws IOException {
+      MockInputStream result = new MockInputStream(this);
+      streams.add(result);
+      return result;
+    }
+
+    void removeStream(MockInputStream stream) {
+      streams.remove(stream);
+    }
+
+    int streamCount() {
+      return streams.size();
+    }
+
+    @Override
+    public FSDataOutputStream create(Path path, FsPermission fsPermission,
+                                     boolean b, int i, short i1, long l,
+                                     Progressable progressable) throws IOException {
+      throw new IOException("Can't create");
+    }
+
+    @Override
+    public FSDataOutputStream append(Path path, int i,
+                                     Progressable progressable) throws IOException {
+      throw new IOException("Can't append");
+    }
+
+    @Override
+    public boolean rename(Path path, Path path1) {
+      return false;
+    }
+
+    @Override
+    public boolean delete(Path path, boolean b) {
+      return false;
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) {
+      return new FileStatus[0];
+    }
+
+    @Override
+    public void setWorkingDirectory(Path path) {
+      // ignore
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return new Path("/");
+    }
+
+    @Override
+    public boolean mkdirs(Path path, FsPermission fsPermission) {
+      return false;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) {
+      return new FileStatus(MockInputStream.SIMPLE_ORC.length, false, 1, 4096,
+          0, path);
+    }
+  }
+
+  @Test
+  public void testClosingRowsFirst() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(1, fs.streamCount());
+    RecordReader rows = reader.rows();
+    assertEquals(1, fs.streamCount());
+    RecordReader rows2 = reader.rows();
+    assertEquals(2, fs.streamCount());
+    rows.close();
+    assertEquals(1, fs.streamCount());
+    rows2.close();
+    assertEquals(0, fs.streamCount());
+    reader.close();
+    assertEquals(0, fs.streamCount());
+  }
+
+  @Test
+  public void testClosingReaderFirst() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(1, fs.streamCount());
+    RecordReader rows = reader.rows();
+    assertEquals(1, fs.streamCount());
+    reader.close();
+    assertEquals(1, fs.streamCount());
+    rows.close();
+    assertEquals(0, fs.streamCount());
+  }
+
+  @Test
+  public void testClosingMultiple() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    Reader reader2 = OrcFile.createReader(new Path("/bar"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(2, fs.streamCount());
+    reader.close();
+    assertEquals(1, fs.streamCount());
+    reader2.close();
+    assertEquals(0, fs.streamCount());
+  }
 }