You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/05/07 01:37:19 UTC

[orc] branch branch-1.4 updated (3e14b7c -> c43172e)

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a change to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/orc.git.


    from 3e14b7c  ORC-476: Make SearchAgument kryo buffer size configurable
     add 4d89234  ORC-497 - Fix build failures for maven 3.6.x
     add 48c9f60  ORC-411: Update pom file to work with openjdk 10.
     new c43172e  ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 java/core/pom.xml                                  |   4 +
 java/core/src/java/org/apache/orc/Reader.java      |   3 +-
 .../org/apache/orc/impl/DataReaderProperties.java  |  21 +++
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |  34 ++++-
 .../java/org/apache/orc/impl/RecordReaderImpl.java |  10 +-
 .../org/apache/orc/impl/RecordReaderUtils.java     |  30 +++-
 .../test/org/apache/orc/impl/TestReaderImpl.java   | 153 +++++++++++++++++++++
 java/pom.xml                                       |  79 ++++++++---
 8 files changed, 301 insertions(+), 33 deletions(-)


[orc] 01/01: ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.

Posted by om...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/orc.git

commit c43172e765e9575259d52e1655b495ea64c5c8b0
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon May 6 14:29:25 2019 -0700

    ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.
---
 java/core/src/java/org/apache/orc/Reader.java      |   3 +-
 .../org/apache/orc/impl/DataReaderProperties.java  |  21 +++
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |  34 ++++-
 .../java/org/apache/orc/impl/RecordReaderImpl.java |  10 +-
 .../org/apache/orc/impl/RecordReaderUtils.java     |  30 +++-
 .../test/org/apache/orc/impl/TestReaderImpl.java   | 153 +++++++++++++++++++++
 6 files changed, 239 insertions(+), 12 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 2ef64d7..3b41251 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -18,6 +18,7 @@
 
 package org.apache.orc;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  *
  * One Reader can support multiple concurrent RecordReader.
  */
-public interface Reader {
+public interface Reader extends Closeable {
 
   /**
    * Get the number of rows in the file.
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
index fbdc145..5bbb8a6 100644
--- a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -17,6 +17,7 @@
  */
 package org.apache.orc.impl;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.CompressionKind;
@@ -25,6 +26,8 @@ public final class DataReaderProperties {
 
   private final FileSystem fileSystem;
   private final Path path;
+  private final FSDataInputStream file;
+  private final ReaderImpl reader;
   private final CompressionKind compression;
   private final boolean zeroCopy;
   private final int typeCount;
@@ -33,6 +36,8 @@ public final class DataReaderProperties {
   private DataReaderProperties(Builder builder) {
     this.fileSystem = builder.fileSystem;
     this.path = builder.path;
+    this.reader = builder.reader;
+    this.file = builder.file;
     this.compression = builder.compression;
     this.zeroCopy = builder.zeroCopy;
     this.typeCount = builder.typeCount;
@@ -47,6 +52,14 @@ public final class DataReaderProperties {
     return path;
   }
 
+  public ReaderImpl getReader() {
+    return reader;
+  }
+
+  public FSDataInputStream getFile() {
+    return file;
+  }
+
   public CompressionKind getCompression() {
     return compression;
   }
@@ -71,6 +84,8 @@ public final class DataReaderProperties {
 
     private FileSystem fileSystem;
     private Path path;
+    private ReaderImpl reader;
+    private FSDataInputStream file;
     private CompressionKind compression;
     private boolean zeroCopy;
     private int typeCount;
@@ -90,6 +105,12 @@ public final class DataReaderProperties {
       return this;
     }
 
+    public Builder withFile(ReaderImpl reader, FSDataInputStream file) {
+      this.reader = reader;
+      this.file = file;
+      return this;
+    }
+
     public Builder withCompression(CompressionKind value) {
       this.compression = value;
       return this;
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 80b7a60..5159732 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -63,6 +63,8 @@ public class ReaderImpl implements Reader {
   private final long maxLength;
   protected final Path path;
   protected final org.apache.orc.CompressionKind compressionKind;
+  protected FSDataInputStream file;
+  protected int fileReferenceCount = 0;
   protected int bufferSize;
   protected OrcProto.Metadata metadata;
   private List<OrcProto.StripeStatistics> stripeStats;
@@ -519,7 +521,9 @@ public class ReaderImpl implements Reader {
     OrcProto.PostScript ps;
     OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
     long modificationTime;
-    try (FSDataInputStream file = fs.open(path)) {
+    file = fs.open(path);
+    addFileReference();
+    try {
       // figure out the size of the file using the option or filesystem
       long size;
       if (maxFileLength == Long.MAX_VALUE) {
@@ -536,9 +540,9 @@ public class ReaderImpl implements Reader {
         return buildEmptyTail();
       } else if (size <= OrcFile.MAGIC.length()) {
         // Anything smaller than MAGIC header cannot be valid (valid ORC files
-	// are actually around 40 bytes, this is more conservative)
+        // are actually around 40 bytes, this is more conservative)
         throw new FileFormatException("Not a valid ORC file " + path
-          + " (maxFileLength= " + maxFileLength + ")");
+                                          + " (maxFileLength= " + maxFileLength + ")");
       }
       fileTailBuilder.setFileLength(size);
 
@@ -598,6 +602,14 @@ public class ReaderImpl implements Reader {
         OrcCodecPool.returnCodec(compressionKind, codec);
       }
       fileTailBuilder.setFooter(footer);
+    } catch (Throwable thr) {
+      try {
+        removeFileReference();
+      } catch (IOException ignore) {
+        // ignore
+      }
+      throw thr instanceof IOException ? (IOException) thr :
+                new IOException("Problem reading file footer " + path, thr);
     }
 
     ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining());
@@ -823,4 +835,20 @@ public class ReaderImpl implements Reader {
     buffer.append(")");
     return buffer.toString();
   }
+
+  void addFileReference() {
+    fileReferenceCount += 1;
+  }
+
+  void removeFileReference() throws IOException {
+    fileReferenceCount -= 1;
+    if (fileReferenceCount == 0 && file != null) {
+      file.close();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    removeFileReference();
+  }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 0dacc70..83b987d 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -241,15 +241,19 @@ public class RecordReaderImpl implements RecordReader {
     if (options.getDataReader() != null) {
       this.dataReader = options.getDataReader().clone();
     } else {
-      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+      DataReaderProperties.Builder builder =
           DataReaderProperties.builder()
               .withBufferSize(bufferSize)
               .withCompression(fileReader.compressionKind)
               .withFileSystem(fileReader.fileSystem)
               .withPath(fileReader.path)
               .withTypeCount(types.size())
-              .withZeroCopy(zeroCopy)
-              .build());
+              .withZeroCopy(zeroCopy);
+      if (fileReader.file != null) {
+        builder.withFile(fileReader, fileReader.file);
+      }
+      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+          builder.build());
     }
     this.dataReader.open();
     firstRow = skippedRows;
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 6006634..3d6699b 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -142,20 +142,24 @@ public class RecordReaderUtils {
   }
 
   private static class DefaultDataReader implements DataReader {
-    private FSDataInputStream file = null;
     private final ByteBufferAllocatorPool pool;
     private HadoopShims.ZeroCopyReaderShim zcr = null;
     private final FileSystem fs;
     private final Path path;
+    private final ReaderImpl reader;
+    private FSDataInputStream file;
     private final boolean useZeroCopy;
     private final CompressionCodec codec;
     private final int bufferSize;
     private final int typeCount;
     private CompressionKind compressionKind;
+    private boolean isOpen = false;
 
     private DefaultDataReader(DataReaderProperties properties) {
       this.fs = properties.getFileSystem();
       this.path = properties.getPath();
+      this.reader = properties.getReader();
+      this.file = properties.getFile();
       this.useZeroCopy = properties.getZeroCopy();
       this.compressionKind = properties.getCompression();
       this.codec = OrcCodecPool.getCodec(compressionKind);
@@ -170,12 +174,17 @@ public class RecordReaderUtils {
 
     @Override
     public void open() throws IOException {
-      this.file = fs.open(path);
+      if (file == null) {
+        this.file = fs.open(path);
+      } else if (reader != null) {
+        reader.addFileReference();
+      }
       if (useZeroCopy) {
         zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
       } else {
         zcr = null;
       }
+      isOpen = true;
     }
 
     @Override
@@ -190,7 +199,7 @@ public class RecordReaderUtils {
                                  OrcProto.Stream.Kind[] bloomFilterKinds,
                                  OrcProto.BloomFilterIndex[] bloomFilterIndices
                                  ) throws IOException {
-      if (file == null) {
+      if (!isOpen) {
         open();
       }
       if (footer == null) {
@@ -258,7 +267,7 @@ public class RecordReaderUtils {
 
     @Override
     public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-      if (file == null) {
+      if (!isOpen) {
         open();
       }
       long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
@@ -289,7 +298,11 @@ public class RecordReaderUtils {
       // close both zcr and file
       try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
         if (file != null) {
-          file.close();
+          if (reader == null) {
+            file.close();
+          } else {
+            reader.removeFileReference();
+          }
         }
       }
     }
@@ -307,6 +320,13 @@ public class RecordReaderUtils {
     @Override
     public DataReader clone() {
       try {
+        if (file != null) {
+          if (reader != null) {
+            reader.addFileReference();
+          } else {
+            throw new IllegalStateException("Can't clone open data reader");
+          }
+        }
         return (DataReader) super.clone();
       } catch (CloneNotSupportedException e) {
         throw new UnsupportedOperationException("uncloneable", e);
diff --git a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
index 23cdc89..7cc50a0 100644
--- a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
@@ -17,15 +17,26 @@ package org.apache.orc.impl;
 
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
 import org.apache.orc.FileFormatException;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcFile;
@@ -168,4 +179,146 @@ public class TestReaderImpl {
       readFully(position, buffer, 0, buffer.length);
     }
   }
+
+  static class MockInputStream extends FSDataInputStream {
+    MockFileSystem fs;
+
+    public MockInputStream(MockFileSystem fs) {
+      super(new SeekableByteArrayInputStream(new byte[0]));
+      this.fs = fs;
+    }
+
+    public void close() {
+      fs.removeStream(this);
+    }
+  }
+
+  static class MockFileSystem extends FileSystem {
+    final List<MockInputStream> streams = new ArrayList<>();
+
+    public MockFileSystem(Configuration conf) {
+      setConf(conf);
+    }
+
+    @Override
+    public URI getUri() {
+      try {
+        return new URI("mock:///");
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("bad uri", e);
+      }
+    }
+
+    @Override
+    public FSDataInputStream open(Path path, int i) {
+      MockInputStream result = new MockInputStream(this);
+      streams.add(result);
+      return result;
+    }
+
+    void removeStream(MockInputStream stream) {
+      streams.remove(stream);
+    }
+
+    int streamCount() {
+      return streams.size();
+    }
+
+    @Override
+    public FSDataOutputStream create(Path path, FsPermission fsPermission,
+                                     boolean b, int i, short i1, long l,
+                                     Progressable progressable) throws IOException {
+      throw new IOException("Can't create");
+    }
+
+    @Override
+    public FSDataOutputStream append(Path path, int i,
+                                     Progressable progressable) throws IOException {
+      throw new IOException("Can't append");
+    }
+
+    @Override
+    public boolean rename(Path path, Path path1) {
+      return false;
+    }
+
+    @Override
+    public boolean delete(Path path, boolean b) {
+      return false;
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) {
+      return new FileStatus[0];
+    }
+
+    @Override
+    public void setWorkingDirectory(Path path) {
+      // ignore
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return new Path("/");
+    }
+
+    @Override
+    public boolean mkdirs(Path path, FsPermission fsPermission) {
+      return false;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) {
+      return new FileStatus();
+    }
+  }
+
+  @Test
+  public void testClosingRowsFirst() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(1, fs.streamCount());
+    RecordReader rows = reader.rows();
+    assertEquals(1, fs.streamCount());
+    RecordReader rows2 = reader.rows();
+    assertEquals(1, fs.streamCount());
+    rows.close();
+    assertEquals(1, fs.streamCount());
+    rows2.close();
+    assertEquals(1, fs.streamCount());
+    reader.close();
+    assertEquals(0, fs.streamCount());
+  }
+
+  @Test
+  public void testClosingReaderFirst() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(1, fs.streamCount());
+    RecordReader rows = reader.rows();
+    assertEquals(1, fs.streamCount());
+    reader.close();
+    assertEquals(1, fs.streamCount());
+    rows.close();
+    assertEquals(0, fs.streamCount());
+  }
+
+  @Test
+  public void testClosingMultiple() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf);
+    Reader reader = OrcFile.createReader(new Path("/foo"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    Reader reader2 = OrcFile.createReader(new Path("/bar"),
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(2, fs.streamCount());
+    reader.close();
+    assertEquals(1, fs.streamCount());
+    reader2.close();
+    assertEquals(0, fs.streamCount());
+  }
 }