You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/05/07 01:37:20 UTC
[orc] 01/01: ORC-498: ReaderImpl and RecordReaderImpl open separate
file handles.
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/orc.git
commit c43172e765e9575259d52e1655b495ea64c5c8b0
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon May 6 14:29:25 2019 -0700
ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.
---
java/core/src/java/org/apache/orc/Reader.java | 3 +-
.../org/apache/orc/impl/DataReaderProperties.java | 21 +++
.../src/java/org/apache/orc/impl/ReaderImpl.java | 34 ++++-
.../java/org/apache/orc/impl/RecordReaderImpl.java | 10 +-
.../org/apache/orc/impl/RecordReaderUtils.java | 30 +++-
.../test/org/apache/orc/impl/TestReaderImpl.java | 153 +++++++++++++++++++++
6 files changed, 239 insertions(+), 12 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 2ef64d7..3b41251 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -18,6 +18,7 @@
package org.apache.orc;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
*
* One Reader can support multiple concurrent RecordReader.
*/
-public interface Reader {
+public interface Reader extends Closeable {
/**
* Get the number of rows in the file.
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
index fbdc145..5bbb8a6 100644
--- a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -17,6 +17,7 @@
*/
package org.apache.orc.impl;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.CompressionKind;
@@ -25,6 +26,8 @@ public final class DataReaderProperties {
private final FileSystem fileSystem;
private final Path path;
+ private final FSDataInputStream file;
+ private final ReaderImpl reader;
private final CompressionKind compression;
private final boolean zeroCopy;
private final int typeCount;
@@ -33,6 +36,8 @@ public final class DataReaderProperties {
private DataReaderProperties(Builder builder) {
this.fileSystem = builder.fileSystem;
this.path = builder.path;
+ this.reader = builder.reader;
+ this.file = builder.file;
this.compression = builder.compression;
this.zeroCopy = builder.zeroCopy;
this.typeCount = builder.typeCount;
@@ -47,6 +52,14 @@ public final class DataReaderProperties {
return path;
}
+ public ReaderImpl getReader() {
+ return reader;
+ }
+
+ public FSDataInputStream getFile() {
+ return file;
+ }
+
public CompressionKind getCompression() {
return compression;
}
@@ -71,6 +84,8 @@ public final class DataReaderProperties {
private FileSystem fileSystem;
private Path path;
+ private ReaderImpl reader;
+ private FSDataInputStream file;
private CompressionKind compression;
private boolean zeroCopy;
private int typeCount;
@@ -90,6 +105,12 @@ public final class DataReaderProperties {
return this;
}
+ public Builder withFile(ReaderImpl reader, FSDataInputStream file) {
+ this.reader = reader;
+ this.file = file;
+ return this;
+ }
+
public Builder withCompression(CompressionKind value) {
this.compression = value;
return this;
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 80b7a60..5159732 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -63,6 +63,8 @@ public class ReaderImpl implements Reader {
private final long maxLength;
protected final Path path;
protected final org.apache.orc.CompressionKind compressionKind;
+ protected FSDataInputStream file;
+ protected int fileReferenceCount = 0;
protected int bufferSize;
protected OrcProto.Metadata metadata;
private List<OrcProto.StripeStatistics> stripeStats;
@@ -519,7 +521,9 @@ public class ReaderImpl implements Reader {
OrcProto.PostScript ps;
OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
long modificationTime;
- try (FSDataInputStream file = fs.open(path)) {
+ file = fs.open(path);
+ addFileReference();
+ try {
// figure out the size of the file using the option or filesystem
long size;
if (maxFileLength == Long.MAX_VALUE) {
@@ -536,9 +540,9 @@ public class ReaderImpl implements Reader {
return buildEmptyTail();
} else if (size <= OrcFile.MAGIC.length()) {
// Anything smaller than MAGIC header cannot be valid (valid ORC files
- // are actually around 40 bytes, this is more conservative)
+ // are actually around 40 bytes, this is more conservative)
throw new FileFormatException("Not a valid ORC file " + path
- + " (maxFileLength= " + maxFileLength + ")");
+ + " (maxFileLength= " + maxFileLength + ")");
}
fileTailBuilder.setFileLength(size);
@@ -598,6 +602,14 @@ public class ReaderImpl implements Reader {
OrcCodecPool.returnCodec(compressionKind, codec);
}
fileTailBuilder.setFooter(footer);
+ } catch (Throwable thr) {
+ try {
+ removeFileReference();
+ } catch (IOException ignore) {
+ // ignore
+ }
+ throw thr instanceof IOException ? (IOException) thr :
+ new IOException("Problem reading file footer " + path, thr);
}
ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining());
@@ -823,4 +835,20 @@ public class ReaderImpl implements Reader {
buffer.append(")");
return buffer.toString();
}
+
+ void addFileReference() {
+ fileReferenceCount += 1;
+ }
+
+ void removeFileReference() throws IOException {
+ fileReferenceCount -= 1;
+ if (fileReferenceCount == 0 && file != null) {
+ file.close();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ removeFileReference();
+ }
}
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 0dacc70..83b987d 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -241,15 +241,19 @@ public class RecordReaderImpl implements RecordReader {
if (options.getDataReader() != null) {
this.dataReader = options.getDataReader().clone();
} else {
- this.dataReader = RecordReaderUtils.createDefaultDataReader(
+ DataReaderProperties.Builder builder =
DataReaderProperties.builder()
.withBufferSize(bufferSize)
.withCompression(fileReader.compressionKind)
.withFileSystem(fileReader.fileSystem)
.withPath(fileReader.path)
.withTypeCount(types.size())
- .withZeroCopy(zeroCopy)
- .build());
+ .withZeroCopy(zeroCopy);
+ if (fileReader.file != null) {
+ builder.withFile(fileReader, fileReader.file);
+ }
+ this.dataReader = RecordReaderUtils.createDefaultDataReader(
+ builder.build());
}
this.dataReader.open();
firstRow = skippedRows;
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 6006634..3d6699b 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -142,20 +142,24 @@ public class RecordReaderUtils {
}
private static class DefaultDataReader implements DataReader {
- private FSDataInputStream file = null;
private final ByteBufferAllocatorPool pool;
private HadoopShims.ZeroCopyReaderShim zcr = null;
private final FileSystem fs;
private final Path path;
+ private final ReaderImpl reader;
+ private FSDataInputStream file;
private final boolean useZeroCopy;
private final CompressionCodec codec;
private final int bufferSize;
private final int typeCount;
private CompressionKind compressionKind;
+ private boolean isOpen = false;
private DefaultDataReader(DataReaderProperties properties) {
this.fs = properties.getFileSystem();
this.path = properties.getPath();
+ this.reader = properties.getReader();
+ this.file = properties.getFile();
this.useZeroCopy = properties.getZeroCopy();
this.compressionKind = properties.getCompression();
this.codec = OrcCodecPool.getCodec(compressionKind);
@@ -170,12 +174,17 @@ public class RecordReaderUtils {
@Override
public void open() throws IOException {
- this.file = fs.open(path);
+ if (file == null) {
+ this.file = fs.open(path);
+ } else if (reader != null) {
+ reader.addFileReference();
+ }
if (useZeroCopy) {
zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
} else {
zcr = null;
}
+ isOpen = true;
}
@Override
@@ -190,7 +199,7 @@ public class RecordReaderUtils {
OrcProto.Stream.Kind[] bloomFilterKinds,
OrcProto.BloomFilterIndex[] bloomFilterIndices
) throws IOException {
- if (file == null) {
+ if (!isOpen) {
open();
}
if (footer == null) {
@@ -258,7 +267,7 @@ public class RecordReaderUtils {
@Override
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
- if (file == null) {
+ if (!isOpen) {
open();
}
long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
@@ -289,7 +298,11 @@ public class RecordReaderUtils {
// close both zcr and file
try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
if (file != null) {
- file.close();
+ if (reader == null) {
+ file.close();
+ } else {
+ reader.removeFileReference();
+ }
}
}
}
@@ -307,6 +320,13 @@ public class RecordReaderUtils {
@Override
public DataReader clone() {
try {
+ if (file != null) {
+ if (reader != null) {
+ reader.addFileReference();
+ } else {
+ throw new IllegalStateException("Can't clone open data reader");
+ }
+ }
return (DataReader) super.clone();
} catch (CloneNotSupportedException e) {
throw new UnsupportedOperationException("uncloneable", e);
diff --git a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
index 23cdc89..7cc50a0 100644
--- a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
@@ -17,15 +17,26 @@ package org.apache.orc.impl;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
import org.apache.orc.FileFormatException;
import org.apache.hadoop.io.Text;
import org.apache.orc.OrcFile;
@@ -168,4 +179,146 @@ public class TestReaderImpl {
readFully(position, buffer, 0, buffer.length);
}
}
+
+ static class MockInputStream extends FSDataInputStream {
+ MockFileSystem fs;
+
+ public MockInputStream(MockFileSystem fs) {
+ super(new SeekableByteArrayInputStream(new byte[0]));
+ this.fs = fs;
+ }
+
+ public void close() {
+ fs.removeStream(this);
+ }
+ }
+
+ static class MockFileSystem extends FileSystem {
+ final List<MockInputStream> streams = new ArrayList<>();
+
+ public MockFileSystem(Configuration conf) {
+ setConf(conf);
+ }
+
+ @Override
+ public URI getUri() {
+ try {
+ return new URI("mock:///");
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("bad uri", e);
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) {
+ MockInputStream result = new MockInputStream(this);
+ streams.add(result);
+ return result;
+ }
+
+ void removeStream(MockInputStream stream) {
+ streams.remove(stream);
+ }
+
+ int streamCount() {
+ return streams.size();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission,
+ boolean b, int i, short i1, long l,
+ Progressable progressable) throws IOException {
+ throw new IOException("Can't create");
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i,
+ Progressable progressable) throws IOException {
+ throw new IOException("Can't append");
+ }
+
+ @Override
+ public boolean rename(Path path, Path path1) {
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) {
+ return false;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) {
+ return new FileStatus[0];
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ // ignore
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path("/");
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) {
+ return false;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) {
+ return new FileStatus();
+ }
+ }
+
+ @Test
+ public void testClosingRowsFirst() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf);
+ Reader reader = OrcFile.createReader(new Path("/foo"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals(1, fs.streamCount());
+ RecordReader rows = reader.rows();
+ assertEquals(1, fs.streamCount());
+ RecordReader rows2 = reader.rows();
+ assertEquals(1, fs.streamCount());
+ rows.close();
+ assertEquals(1, fs.streamCount());
+ rows2.close();
+ assertEquals(1, fs.streamCount());
+ reader.close();
+ assertEquals(0, fs.streamCount());
+ }
+
+ @Test
+ public void testClosingReaderFirst() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf);
+ Reader reader = OrcFile.createReader(new Path("/foo"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals(1, fs.streamCount());
+ RecordReader rows = reader.rows();
+ assertEquals(1, fs.streamCount());
+ reader.close();
+ assertEquals(1, fs.streamCount());
+ rows.close();
+ assertEquals(0, fs.streamCount());
+ }
+
+ @Test
+ public void testClosingMultiple() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf);
+ Reader reader = OrcFile.createReader(new Path("/foo"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ Reader reader2 = OrcFile.createReader(new Path("/bar"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals(2, fs.streamCount());
+ reader.close();
+ assertEquals(1, fs.streamCount());
+ reader2.close();
+ assertEquals(0, fs.streamCount());
+ }
}