You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/05/07 03:06:53 UTC
[orc] branch branch-1.5 updated: ORC-498: ReaderImpl and
RecordReaderImpl open separate file handles.
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 6d3943e ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.
6d3943e is described below
commit 6d3943eb456985a00973dc2e94ad3b3389c4ba05
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon May 6 14:29:25 2019 -0700
ORC-498: ReaderImpl and RecordReaderImpl open separate file handles.
Fixes #391
Signed-off-by: Owen O'Malley <om...@apache.org>
---
java/core/src/java/org/apache/orc/Reader.java | 3 +-
.../org/apache/orc/impl/DataReaderProperties.java | 13 ++
.../src/java/org/apache/orc/impl/ReaderImpl.java | 35 ++++-
.../java/org/apache/orc/impl/RecordReaderImpl.java | 12 +-
.../org/apache/orc/impl/RecordReaderUtils.java | 15 +-
.../test/org/apache/orc/impl/TestReaderImpl.java | 172 +++++++++++++++++++++
6 files changed, 238 insertions(+), 12 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index a90b6c4..6d6e04b 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -18,6 +18,7 @@
package org.apache.orc;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
*
* One Reader can support multiple concurrent RecordReader.
*/
-public interface Reader {
+public interface Reader extends Closeable {
/**
* Get the number of rows in the file.
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
index 8420149..386324c 100644
--- a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -17,6 +17,7 @@
*/
package org.apache.orc.impl;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.CompressionKind;
@@ -26,6 +27,7 @@ public final class DataReaderProperties {
private final FileSystem fileSystem;
private final Path path;
+ private final FSDataInputStream file;
private final CompressionKind compression;
private final boolean zeroCopy;
private final int typeCount;
@@ -35,6 +37,7 @@ public final class DataReaderProperties {
private DataReaderProperties(Builder builder) {
this.fileSystem = builder.fileSystem;
this.path = builder.path;
+ this.file = builder.file;
this.compression = builder.compression;
this.zeroCopy = builder.zeroCopy;
this.typeCount = builder.typeCount;
@@ -50,6 +53,10 @@ public final class DataReaderProperties {
return path;
}
+ public FSDataInputStream getFile() {
+ return file;
+ }
+
public CompressionKind getCompression() {
return compression;
}
@@ -78,6 +85,7 @@ public final class DataReaderProperties {
private FileSystem fileSystem;
private Path path;
+ private FSDataInputStream file;
private CompressionKind compression;
private boolean zeroCopy;
private int typeCount;
@@ -98,6 +106,11 @@ public final class DataReaderProperties {
return this;
}
+ public Builder withFile(FSDataInputStream file) {
+ this.file = file;
+ return this;
+ }
+
public Builder withCompression(CompressionKind value) {
this.compression = value;
return this;
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index d086421..01b449b 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -64,6 +64,7 @@ public class ReaderImpl implements Reader {
protected final Path path;
protected final OrcFile.ReaderOptions options;
protected final org.apache.orc.CompressionKind compressionKind;
+ protected FSDataInputStream file;
protected int bufferSize;
protected OrcProto.Metadata metadata;
private List<OrcProto.StripeStatistics> stripeStats;
@@ -531,7 +532,8 @@ public class ReaderImpl implements Reader {
OrcProto.PostScript ps;
OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
long modificationTime;
- try (FSDataInputStream file = fs.open(path)) {
+ file = fs.open(path);
+ try {
// figure out the size of the file using the option or filesystem
long size;
if (maxFileLength == Long.MAX_VALUE) {
@@ -548,9 +550,9 @@ public class ReaderImpl implements Reader {
return buildEmptyTail();
} else if (size <= OrcFile.MAGIC.length()) {
// Anything smaller than MAGIC header cannot be valid (valid ORC files
- // are actually around 40 bytes, this is more conservative)
+ // are actually around 40 bytes, this is more conservative)
throw new FileFormatException("Not a valid ORC file " + path
- + " (maxFileLength= " + maxFileLength + ")");
+ + " (maxFileLength= " + maxFileLength + ")");
}
fileTailBuilder.setFileLength(size);
@@ -610,6 +612,14 @@ public class ReaderImpl implements Reader {
OrcCodecPool.returnCodec(compressionKind, codec);
}
fileTailBuilder.setFooter(footer);
+ } catch (Throwable thr) {
+ try {
+ close();
+ } catch (IOException ignore) {
+ LOG.info("Ignoring secondary exception in close of " + path, ignore);
+ }
+ throw thr instanceof IOException ? (IOException) thr :
+ new IOException("Problem reading file footer " + path, thr);
}
ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining());
@@ -835,4 +845,23 @@ public class ReaderImpl implements Reader {
buffer.append(")");
return buffer.toString();
}
+
+ @Override
+ public void close() throws IOException {
+ if (file != null) {
+ file.close();
+ }
+ }
+
+ /**
+ * Take the file from the reader.
+ * This allows the first RecordReader to use the same file, but additional
+ * RecordReaders will open new handles.
+ * @return a file handle, if one is available
+ */
+ FSDataInputStream takeFile() {
+ FSDataInputStream result = file;
+ file = null;
+ return result;
+ }
}
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 3ee1b62..7f7965f 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.orc.impl;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.orc.CompressionKind;
import java.io.IOException;
@@ -240,16 +241,21 @@ public class RecordReaderImpl implements RecordReader {
if (options.getDataReader() != null) {
this.dataReader = options.getDataReader().clone();
} else {
- this.dataReader = RecordReaderUtils.createDefaultDataReader(
+ DataReaderProperties.Builder builder =
DataReaderProperties.builder()
.withBufferSize(bufferSize)
.withCompression(fileReader.compressionKind)
.withFileSystem(fileReader.getFileSystem())
.withPath(fileReader.path)
.withTypeCount(types.size())
- .withZeroCopy(zeroCopy)
.withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
- .build());
+ .withZeroCopy(zeroCopy);
+ FSDataInputStream file = fileReader.takeFile();
+ if (file != null) {
+ builder.withFile(file);
+ }
+ this.dataReader = RecordReaderUtils.createDefaultDataReader(
+ builder.build());
}
firstRow = skippedRows;
totalRowCount = rows;
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 3c43ba4..36a11db 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.orc.impl;
+import org.apache.hadoop.hive.common.io.DiskRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +31,6 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
@@ -144,7 +144,7 @@ public class RecordReaderUtils {
}
private static class DefaultDataReader implements DataReader {
- private FSDataInputStream file = null;
+ private FSDataInputStream file;
private ByteBufferAllocatorPool pool;
private HadoopShims.ZeroCopyReaderShim zcr = null;
private final FileSystem fs;
@@ -155,10 +155,12 @@ public class RecordReaderUtils {
private final int typeCount;
private CompressionKind compressionKind;
private final int maxDiskRangeChunkLimit;
+ private boolean isOpen = false;
private DefaultDataReader(DataReaderProperties properties) {
this.fs = properties.getFileSystem();
this.path = properties.getPath();
+ this.file = properties.getFile();
this.useZeroCopy = properties.getZeroCopy();
this.compressionKind = properties.getCompression();
this.codec = OrcCodecPool.getCodec(compressionKind);
@@ -169,7 +171,9 @@ public class RecordReaderUtils {
@Override
public void open() throws IOException {
- this.file = fs.open(path);
+ if (file == null) {
+ this.file = fs.open(path);
+ }
if (useZeroCopy) {
// ZCR only uses codec for boolean checks.
pool = new ByteBufferAllocatorPool();
@@ -177,6 +181,7 @@ public class RecordReaderUtils {
} else {
zcr = null;
}
+ isOpen = true;
}
@Override
@@ -191,7 +196,7 @@ public class RecordReaderUtils {
OrcProto.Stream.Kind[] bloomFilterKinds,
OrcProto.BloomFilterIndex[] bloomFilterIndices
) throws IOException {
- if (file == null) {
+ if (!isOpen) {
open();
}
if (footer == null) {
@@ -258,7 +263,7 @@ public class RecordReaderUtils {
@Override
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
- if (file == null) {
+ if (!isOpen) {
open();
}
long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
diff --git a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
index 63cdefd..8471a1e 100644
--- a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
@@ -17,15 +17,26 @@ package org.apache.orc.impl;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
import org.apache.orc.FileFormatException;
import org.apache.hadoop.io.Text;
import org.apache.orc.OrcFile;
@@ -168,4 +179,165 @@ public class TestReaderImpl {
readFully(position, buffer, 0, buffer.length);
}
}
+
+ static byte[] byteArray(int... input) {
+ byte[] result = new byte[input.length];
+ for(int i=0; i < result.length; ++i) {
+ result[i] = (byte) input[i];
+ }
+ return result;
+ }
+
+ static class MockInputStream extends FSDataInputStream {
+ MockFileSystem fs;
+ // A single row ORC file
+ static final byte[] SIMPLE_ORC = byteArray(
+ 0x4f, 0x52, 0x43, 0x42, 0x00, 0x80, 0x0a, 0x06, 0x08, 0x01, 0x10, 0x01, 0x18, 0x03, 0x12, 0x02,
+ 0x08, 0x00, 0x12, 0x02, 0x08, 0x02, 0x0a, 0x12, 0x0a, 0x04, 0x08, 0x00, 0x50, 0x00, 0x0a, 0x0a,
+ 0x08, 0x00, 0x12, 0x02, 0x18, 0x00, 0x50, 0x00, 0x58, 0x03, 0x08, 0x03, 0x10, 0x16, 0x1a, 0x0a,
+ 0x08, 0x03, 0x10, 0x00, 0x18, 0x03, 0x20, 0x10, 0x28, 0x01, 0x22, 0x08, 0x08, 0x0c, 0x12, 0x01,
+ 0x01, 0x1a, 0x01, 0x78, 0x22, 0x02, 0x08, 0x03, 0x30, 0x01, 0x3a, 0x04, 0x08, 0x00, 0x50, 0x00,
+ 0x3a, 0x0a, 0x08, 0x00, 0x12, 0x02, 0x18, 0x00, 0x50, 0x00, 0x58, 0x03, 0x40, 0x00, 0x48, 0x00,
+ 0x08, 0x36, 0x10, 0x00, 0x22, 0x02, 0x00, 0x0c, 0x28, 0x14, 0x30, 0x07, 0x82, 0xf4, 0x03, 0x03,
+ 0x4f, 0x52, 0x43, 0x13);
+
+ public MockInputStream(MockFileSystem fs) throws IOException {
+ super(new SeekableByteArrayInputStream(SIMPLE_ORC));
+ this.fs = fs;
+ }
+
+ public void close() {
+ fs.removeStream(this);
+ }
+ }
+
+ static class MockFileSystem extends FileSystem {
+ final List<MockInputStream> streams = new ArrayList<>();
+
+ public MockFileSystem(Configuration conf) {
+ setConf(conf);
+ }
+
+ @Override
+ public URI getUri() {
+ try {
+ return new URI("mock:///");
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("bad uri", e);
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ MockInputStream result = new MockInputStream(this);
+ streams.add(result);
+ return result;
+ }
+
+ void removeStream(MockInputStream stream) {
+ streams.remove(stream);
+ }
+
+ int streamCount() {
+ return streams.size();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission,
+ boolean b, int i, short i1, long l,
+ Progressable progressable) throws IOException {
+ throw new IOException("Can't create");
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i,
+ Progressable progressable) throws IOException {
+ throw new IOException("Can't append");
+ }
+
+ @Override
+ public boolean rename(Path path, Path path1) {
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) {
+ return false;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) {
+ return new FileStatus[0];
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ // ignore
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path("/");
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) {
+ return false;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) {
+ return new FileStatus(MockInputStream.SIMPLE_ORC.length, false, 1, 4096,
+ 0, path);
+ }
+ }
+
+ @Test
+ public void testClosingRowsFirst() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf);
+ Reader reader = OrcFile.createReader(new Path("/foo"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals(1, fs.streamCount());
+ RecordReader rows = reader.rows();
+ assertEquals(1, fs.streamCount());
+ RecordReader rows2 = reader.rows();
+ assertEquals(2, fs.streamCount());
+ rows.close();
+ assertEquals(1, fs.streamCount());
+ rows2.close();
+ assertEquals(0, fs.streamCount());
+ reader.close();
+ assertEquals(0, fs.streamCount());
+ }
+
+ @Test
+ public void testClosingReaderFirst() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf);
+ Reader reader = OrcFile.createReader(new Path("/foo"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals(1, fs.streamCount());
+ RecordReader rows = reader.rows();
+ assertEquals(1, fs.streamCount());
+ reader.close();
+ assertEquals(1, fs.streamCount());
+ rows.close();
+ assertEquals(0, fs.streamCount());
+ }
+
+ @Test
+ public void testClosingMultiple() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf);
+ Reader reader = OrcFile.createReader(new Path("/foo"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ Reader reader2 = OrcFile.createReader(new Path("/bar"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals(2, fs.streamCount());
+ reader.close();
+ assertEquals(1, fs.streamCount());
+ reader2.close();
+ assertEquals(0, fs.streamCount());
+ }
}