You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/04/18 22:54:58 UTC
hive git commit: HIVE-13523: Fix connection leak in ORC RecordReader
and refactor for unit testing (Thomas Poepping reviewed by Prasanth
Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master fcc2e7951 -> 72b6bcd78
HIVE-13523: Fix connection leak in ORC RecordReader and refactor for unit testing (Thomas Poepping reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72b6bcd7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72b6bcd7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72b6bcd7
Branch: refs/heads/master
Commit: 72b6bcd78867aa26bf0d220be98d1e7850e0d00e
Parents: fcc2e79
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Apr 18 15:53:34 2016 -0500
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Apr 18 15:53:52 2016 -0500
----------------------------------------------------------------------
orc/src/java/org/apache/orc/DataReader.java | 6 +-
.../java/org/apache/orc/DataReaderFactory.java | 9 ++
.../org/apache/orc/MetadataReaderFactory.java | 12 ++
.../apache/orc/impl/DataReaderProperties.java | 84 ++++++++++
.../orc/impl/DefaultMetadataReaderFactory.java | 14 ++
.../org/apache/orc/impl/MetadataReader.java | 5 +-
.../org/apache/orc/impl/MetadataReaderImpl.java | 17 +-
.../orc/impl/MetadataReaderProperties.java | 96 ++++++++++++
.../orc/impl/TestDataReaderProperties.java | 69 +++++++++
.../orc/impl/TestMetadataReaderProperties.java | 72 +++++++++
.../ql/io/orc/DefaultDataReaderFactory.java | 14 ++
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 40 ++++-
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 155 ++++++++++++++++---
.../hive/ql/io/orc/RecordReaderUtils.java | 17 +-
.../hive/ql/io/orc/TestRecordReaderImpl.java | 145 ++++++++++++-----
15 files changed, 658 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/DataReader.java b/orc/src/java/org/apache/orc/DataReader.java
index 3a5f854..b70f26b 100644
--- a/orc/src/java/org/apache/orc/DataReader.java
+++ b/orc/src/java/org/apache/orc/DataReader.java
@@ -18,20 +18,18 @@
package org.apache.orc;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hive.common.io.DiskRangeList;
/** An abstract data reader that IO formats can use to read bytes from underlying storage. */
-public interface DataReader {
+public interface DataReader extends Closeable {
/** Opens the DataReader, making it ready to use. */
void open() throws IOException;
- /** Closes the DataReader. */
- void close() throws IOException;
-
/** Reads the data.
*
* Note that for the cases such as zero-copy read, caller must release the disk ranges
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/DataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/DataReaderFactory.java b/orc/src/java/org/apache/orc/DataReaderFactory.java
new file mode 100644
index 0000000..ec3a0e9
--- /dev/null
+++ b/orc/src/java/org/apache/orc/DataReaderFactory.java
@@ -0,0 +1,9 @@
+package org.apache.orc;
+
+import org.apache.orc.impl.DataReaderProperties;
+
+public interface DataReaderFactory {
+
+ DataReader create(DataReaderProperties properties);
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/MetadataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/MetadataReaderFactory.java b/orc/src/java/org/apache/orc/MetadataReaderFactory.java
new file mode 100644
index 0000000..64629da
--- /dev/null
+++ b/orc/src/java/org/apache/orc/MetadataReaderFactory.java
@@ -0,0 +1,12 @@
+package org.apache.orc;
+
+import org.apache.orc.impl.MetadataReader;
+import org.apache.orc.impl.MetadataReaderProperties;
+
+import java.io.IOException;
+
+public interface MetadataReaderFactory {
+
+ MetadataReader create(MetadataReaderProperties properties) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
new file mode 100644
index 0000000..49f47d6
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -0,0 +1,84 @@
+package org.apache.orc.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+
+import javax.annotation.Nullable;
+
+public final class DataReaderProperties {
+
+ private final FileSystem fileSystem;
+ private final Path path;
+ private final CompressionCodec codec;
+ private final boolean zeroCopy;
+
+ private DataReaderProperties(Builder builder) {
+ this.fileSystem = builder.fileSystem;
+ this.path = builder.path;
+ this.codec = builder.codec;
+ this.zeroCopy = builder.zeroCopy;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ @Nullable
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ public boolean getZeroCopy() {
+ return zeroCopy;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private FileSystem fileSystem;
+ private Path path;
+ private CompressionCodec codec;
+ private boolean zeroCopy;
+
+ private Builder() {
+
+ }
+
+ public Builder withFileSystem(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ return this;
+ }
+
+ public Builder withPath(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder withCodec(CompressionCodec codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ public Builder withZeroCopy(boolean zeroCopy) {
+ this.zeroCopy = zeroCopy;
+ return this;
+ }
+
+ public DataReaderProperties build() {
+ Preconditions.checkNotNull(fileSystem);
+ Preconditions.checkNotNull(path);
+
+ return new DataReaderProperties(this);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java b/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
new file mode 100644
index 0000000..fc0d141
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
@@ -0,0 +1,14 @@
+package org.apache.orc.impl;
+
+import org.apache.orc.MetadataReaderFactory;
+
+import java.io.IOException;
+
+public final class DefaultMetadataReaderFactory implements MetadataReaderFactory {
+
+ @Override
+ public MetadataReader create(MetadataReaderProperties properties) throws IOException {
+ return new MetadataReaderImpl(properties);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/impl/MetadataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReader.java b/orc/src/java/org/apache/orc/impl/MetadataReader.java
index 670a81a..500239d 100644
--- a/orc/src/java/org/apache/orc/impl/MetadataReader.java
+++ b/orc/src/java/org/apache/orc/impl/MetadataReader.java
@@ -17,18 +17,17 @@
*/
package org.apache.orc.impl;
+import java.io.Closeable;
import java.io.IOException;
import org.apache.orc.OrcProto;
import org.apache.orc.StripeInformation;
-public interface MetadataReader {
+public interface MetadataReader extends Closeable {
OrcIndex readRowIndex(StripeInformation stripe,
OrcProto.StripeFooter footer,
boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns,
OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException;
OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException;
-
- void close() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
index d0ded52..c3ea74f 100644
--- a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
+++ b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,17 +38,11 @@ public class MetadataReaderImpl implements MetadataReader {
private final int bufferSize;
private final int typeCount;
- public MetadataReaderImpl(FileSystem fileSystem, Path path,
- CompressionCodec codec, int bufferSize, int typeCount) throws IOException {
- this(fileSystem.open(path), codec, bufferSize, typeCount);
- }
-
- public MetadataReaderImpl(FSDataInputStream file,
- CompressionCodec codec, int bufferSize, int typeCount) {
- this.file = file;
- this.codec = codec;
- this.bufferSize = bufferSize;
- this.typeCount = typeCount;
+ MetadataReaderImpl(MetadataReaderProperties properties) throws IOException {
+ this.file = properties.getFileSystem().open(properties.getPath());
+ this.codec = properties.getCodec();
+ this.bufferSize = properties.getBufferSize();
+ this.typeCount = properties.getTypeCount();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java b/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
new file mode 100644
index 0000000..321931c
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
@@ -0,0 +1,96 @@
+package org.apache.orc.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+
+import javax.annotation.Nullable;
+
+public final class MetadataReaderProperties {
+
+ private final FileSystem fileSystem;
+ private final Path path;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final int typeCount;
+
+ private MetadataReaderProperties(Builder builder) {
+ this.fileSystem = builder.fileSystem;
+ this.path = builder.path;
+ this.codec = builder.codec;
+ this.bufferSize = builder.bufferSize;
+ this.typeCount = builder.typeCount;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ @Nullable
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public int getTypeCount() {
+ return typeCount;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private FileSystem fileSystem;
+ private Path path;
+ private CompressionCodec codec;
+ private int bufferSize;
+ private int typeCount;
+
+ private Builder() {
+
+ }
+
+ public Builder withFileSystem(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ return this;
+ }
+
+ public Builder withPath(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder withCodec(CompressionCodec codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ public Builder withBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ public Builder withTypeCount(int typeCount) {
+ this.typeCount = typeCount;
+ return this;
+ }
+
+ public MetadataReaderProperties build() {
+ Preconditions.checkNotNull(fileSystem);
+ Preconditions.checkNotNull(path);
+
+ return new MetadataReaderProperties(this);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
new file mode 100644
index 0000000..9ec08f3
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
@@ -0,0 +1,69 @@
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+public class TestDataReaderProperties {
+
+ private FileSystem mockedFileSystem = mock(FileSystem.class);
+ private Path mockedPath = mock(Path.class);
+ private CompressionCodec mockedCodec = mock(CompressionCodec.class);
+ private boolean mockedZeroCopy = false;
+
+ @Test
+ public void testCompleteBuild() {
+ DataReaderProperties properties = DataReaderProperties.builder()
+ .withFileSystem(mockedFileSystem)
+ .withPath(mockedPath)
+ .withCodec(mockedCodec)
+ .withZeroCopy(mockedZeroCopy)
+ .build();
+ assertEquals(mockedFileSystem, properties.getFileSystem());
+ assertEquals(mockedPath, properties.getPath());
+ assertEquals(mockedCodec, properties.getCodec());
+ assertEquals(mockedZeroCopy, properties.getZeroCopy());
+ }
+
+ @Test
+ public void testMissingNonRequiredArgs() {
+ DataReaderProperties properties = DataReaderProperties.builder()
+ .withFileSystem(mockedFileSystem)
+ .withPath(mockedPath)
+ .build();
+ assertEquals(mockedFileSystem, properties.getFileSystem());
+ assertEquals(mockedPath, properties.getPath());
+ assertNull(properties.getCodec());
+ assertFalse(properties.getZeroCopy());
+ }
+
+ @Test(expected = java.lang.NullPointerException.class)
+ public void testEmptyBuild() {
+ DataReaderProperties.builder().build();
+ }
+
+ @Test(expected = java.lang.NullPointerException.class)
+ public void testMissingPath() {
+ DataReaderProperties.builder()
+ .withFileSystem(mockedFileSystem)
+ .withCodec(mockedCodec)
+ .withZeroCopy(mockedZeroCopy)
+ .build();
+ }
+
+ @Test(expected = java.lang.NullPointerException.class)
+ public void testMissingFileSystem() {
+ DataReaderProperties.builder()
+ .withPath(mockedPath)
+ .withCodec(mockedCodec)
+ .withZeroCopy(mockedZeroCopy)
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
new file mode 100644
index 0000000..12e8eb4
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
@@ -0,0 +1,72 @@
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+public class TestMetadataReaderProperties {
+
+ private FileSystem mockedFileSystem = mock(FileSystem.class);
+ private Path mockedPath = mock(Path.class);
+ private CompressionCodec mockedCodec = mock(CompressionCodec.class);
+ private int mockedBufferSize = 0;
+ private int mockedTypeCount = 0;
+
+ @Test
+ public void testCompleteBuild() {
+ MetadataReaderProperties properties = MetadataReaderProperties.builder()
+ .withFileSystem(mockedFileSystem)
+ .withPath(mockedPath)
+ .withCodec(mockedCodec)
+ .withBufferSize(mockedBufferSize)
+ .withTypeCount(mockedTypeCount)
+ .build();
+ assertEquals(mockedFileSystem, properties.getFileSystem());
+ assertEquals(mockedPath, properties.getPath());
+ assertEquals(mockedCodec, properties.getCodec());
+ assertEquals(mockedBufferSize, properties.getBufferSize());
+ assertEquals(mockedTypeCount, properties.getTypeCount());
+ }
+
+ @Test
+ public void testMissingNonRequiredArgs() {
+ MetadataReaderProperties properties = MetadataReaderProperties.builder()
+ .withFileSystem(mockedFileSystem)
+ .withPath(mockedPath)
+ .build();
+ assertEquals(mockedFileSystem, properties.getFileSystem());
+ assertEquals(mockedPath, properties.getPath());
+ assertNull(properties.getCodec());
+ assertEquals(0, properties.getBufferSize());
+ assertEquals(0, properties.getTypeCount());
+ }
+
+ @Test(expected = java.lang.NullPointerException.class)
+ public void testEmptyBuild() {
+ MetadataReaderProperties.builder().build();
+ }
+
+ @Test(expected = java.lang.NullPointerException.class)
+ public void testMissingPath() {
+ MetadataReaderProperties.builder()
+ .withFileSystem(mockedFileSystem)
+ .withCodec(mockedCodec)
+ .withBufferSize(mockedBufferSize)
+ .build();
+ }
+
+ @Test(expected = java.lang.NullPointerException.class)
+ public void testMissingFileSystem() {
+ MetadataReaderProperties.builder()
+ .withPath(mockedPath)
+ .withCodec(mockedCodec)
+ .withBufferSize(mockedBufferSize)
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
new file mode 100644
index 0000000..de3471c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
@@ -0,0 +1,14 @@
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.orc.DataReader;
+import org.apache.orc.DataReaderFactory;
+import org.apache.orc.impl.DataReaderProperties;
+
+public final class DefaultDataReaderFactory implements DataReaderFactory {
+
+ @Override
+ public DataReader create(DataReaderProperties properties) {
+ return RecordReaderUtils.createDefaultDataReader(properties);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index a031a92..822ef14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.orc.DataReaderFactory;
+import org.apache.orc.MetadataReaderFactory;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -33,9 +35,11 @@ import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.FileMetaInfo;
import org.apache.orc.FileMetadata;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.DefaultMetadataReaderFactory;
import org.apache.orc.impl.InStream;
import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.MetadataReaderImpl;
+import org.apache.orc.impl.MetadataReaderProperties;
import org.apache.orc.StripeInformation;
import org.apache.orc.StripeStatistics;
import org.slf4j.Logger;
@@ -76,7 +80,8 @@ public class ReaderImpl implements Reader {
private final List<StripeInformation> stripes;
protected final int rowIndexStride;
private final long contentLength, numberOfRows;
-
+ private final MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory();
+ private final DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory();
private final ObjectInspector inspector;
private long deserializedSize = -1;
@@ -667,8 +672,20 @@ public class ReaderImpl implements Reader {
Arrays.fill(include, true);
options.include(include);
}
- return new RecordReaderImpl(this.getStripes(), fileSystem, path,
- options, types, codec, bufferSize, rowIndexStride, conf);
+
+ return RecordReaderImpl.builder()
+ .withMetadataReaderFactory(metadataReaderFactory)
+ .withDataReaderFactory(dataReaderFactory)
+ .withStripes(this.getStripes())
+ .withFileSystem(fileSystem)
+ .withPath(path)
+ .withOptions(options)
+ .withTypes(types)
+ .withCodec(codec)
+ .withBufferSize(bufferSize)
+ .withStrideRate(rowIndexStride)
+ .withConf(conf)
+ .build();
}
@@ -852,7 +869,13 @@ public class ReaderImpl implements Reader {
@Override
public MetadataReader metadata() throws IOException {
- return new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size());
+ return metadataReaderFactory.create(MetadataReaderProperties.builder()
+ .withBufferSize(bufferSize)
+ .withCodec(codec)
+ .withFileSystem(fileSystem)
+ .withPath(path)
+ .withTypeCount(types.size())
+ .build());
}
@Override
@@ -867,7 +890,12 @@ public class ReaderImpl implements Reader {
@Override
public DataReader createDefaultDataReader(boolean useZeroCopy) {
- return RecordReaderUtils.createDefaultDataReader(fileSystem, path, useZeroCopy, codec);
+ return dataReaderFactory.create(DataReaderProperties.builder()
+ .withFileSystem(fileSystem)
+ .withPath(path)
+ .withCodec(codec)
+ .withZeroCopy(useZeroCopy)
+ .build());
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 3975409..9cfcc0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -27,8 +27,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.exception.ExceptionUtils;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closer;
import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.DataReaderFactory;
+import org.apache.orc.MetadataReaderFactory;
import org.apache.orc.OrcUtils;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.ColumnStatistics;
@@ -38,11 +41,13 @@ import org.apache.orc.DataReader;
import org.apache.orc.DateColumnStatistics;
import org.apache.orc.DecimalColumnStatistics;
import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.DefaultMetadataReaderFactory;
import org.apache.orc.impl.InStream;
import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.MetadataReaderImpl;
import org.apache.orc.OrcConf;
+import org.apache.orc.impl.MetadataReaderProperties;
import org.apache.orc.impl.OrcIndex;
import org.apache.orc.impl.PositionProvider;
import org.apache.orc.impl.StreamName;
@@ -141,17 +146,99 @@ public class RecordReaderImpl implements RecordReader {
return result;
}
- protected RecordReaderImpl(List<StripeInformation> stripes,
- FileSystem fileSystem,
- Path path,
- Reader.Options options,
- List<OrcProto.Type> types,
- CompressionCodec codec,
- int bufferSize,
- long strideRate,
- Configuration conf
- ) throws IOException {
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Reader.Options options;
+ private CompressionCodec codec;
+ private List<OrcProto.Type> types;
+ private List<StripeInformation> stripes;
+ private int bufferSize;
+ private FileSystem fileSystem;
+ private Path path;
+ private Configuration conf;
+ private long strideRate;
+ private MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory();
+ private DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory();
+
+ private Builder() {
+ }
+
+ public Builder withOptions(Reader.Options options) {
+ this.options = options;
+ return this;
+ }
+
+ public Builder withCodec(CompressionCodec codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ public Builder withTypes(List<OrcProto.Type> types) {
+ this.types = types;
+ return this;
+ }
+
+ public Builder withStripes(List<StripeInformation> stripes) {
+ this.stripes = stripes;
+ return this;
+ }
+
+ public Builder withBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ public Builder withFileSystem(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ return this;
+ }
+
+ public Builder withPath(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder withConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder withStrideRate(long strideRate) {
+ this.strideRate = strideRate;
+ return this;
+ }
+
+ public Builder withMetadataReaderFactory(MetadataReaderFactory metadataReaderFactory) {
+ this.metadataReaderFactory = metadataReaderFactory;
+ return this;
+ }
+
+ public Builder withDataReaderFactory(DataReaderFactory dataReaderFactory) {
+ this.dataReaderFactory = dataReaderFactory;
+ return this;
+ }
+
+ public RecordReaderImpl build() throws IOException {
+ Preconditions.checkNotNull(metadataReaderFactory);
+ Preconditions.checkNotNull(dataReaderFactory);
+ Preconditions.checkNotNull(options);
+ Preconditions.checkNotNull(types);
+ Preconditions.checkNotNull(stripes);
+ Preconditions.checkNotNull(fileSystem);
+ Preconditions.checkNotNull(path);
+ Preconditions.checkNotNull(conf);
+
+ return new RecordReaderImpl(this);
+ }
+ }
+
+ private RecordReaderImpl(Builder builder) throws IOException {
+ Reader.Options options = builder.options;
+ this.types = builder.types;
TreeReaderFactory.TreeReaderSchema treeReaderSchema;
if (options.getSchema() == null) {
if (LOG.isInfoEnabled()) {
@@ -166,18 +253,23 @@ public class RecordReaderImpl implements RecordReader {
List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
}
- this.path = path;
- this.codec = codec;
- this.types = types;
- this.bufferSize = bufferSize;
+ this.path = builder.path;
+ this.codec = builder.codec;
+ this.bufferSize = builder.bufferSize;
this.included = options.getInclude();
- this.conf = conf;
- this.rowIndexStride = strideRate;
- this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size());
+ this.conf = builder.conf;
+ this.rowIndexStride = builder.strideRate;
+ this.metadata = builder.metadataReaderFactory.create(MetadataReaderProperties.builder()
+ .withFileSystem(builder.fileSystem)
+ .withPath(path)
+ .withCodec(codec)
+ .withBufferSize(bufferSize)
+ .withTypeCount(types.size())
+ .build());
SearchArgument sarg = options.getSearchArgument();
- if (sarg != null && strideRate != 0) {
+ if (sarg != null && builder.strideRate != 0) {
sargApp = new SargApplier(
- sarg, options.getColumnNames(), strideRate, types, included.length);
+ sarg, options.getColumnNames(), builder.strideRate, types, included.length);
} else {
sargApp = null;
}
@@ -185,7 +277,7 @@ public class RecordReaderImpl implements RecordReader {
long skippedRows = 0;
long offset = options.getOffset();
long maxOffset = options.getMaxOffset();
- for(StripeInformation stripe: stripes) {
+ for(StripeInformation stripe: builder.stripes) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
skippedRows += stripe.getNumberOfRows();
@@ -200,7 +292,12 @@ public class RecordReaderImpl implements RecordReader {
zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf);
}
// TODO: we could change the ctor to pass this externally
- this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, zeroCopy, codec);
+ this.dataReader = builder.dataReaderFactory.create(DataReaderProperties.builder()
+ .withFileSystem(builder.fileSystem)
+ .withCodec(codec)
+ .withPath(path)
+ .withZeroCopy(zeroCopy)
+ .build());
this.dataReader.open();
firstRow = skippedRows;
@@ -1119,8 +1216,16 @@ public class RecordReaderImpl implements RecordReader {
@Override
public void close() throws IOException {
- clearStreams();
- dataReader.close();
+ Closer closer = Closer.create();
+ try {
+ closer.register(metadata);
+ closer.register(dataReader);
+ clearStreams();
+ } catch (IOException e) {
+ throw closer.rethrow(e);
+ } finally {
+ closer.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
index 8a73948..177721d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
+import org.apache.orc.impl.DataReaderProperties;
import org.apache.orc.impl.DirectDecompressionCodec;
import org.apache.orc.OrcProto;
@@ -60,12 +61,11 @@ public class RecordReaderUtils {
private boolean useZeroCopy;
private CompressionCodec codec;
- public DefaultDataReader(
- FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
- this.fs = fs;
- this.path = path;
- this.useZeroCopy = useZeroCopy;
- this.codec = codec;
+ private DefaultDataReader(DataReaderProperties properties) {
+ this.fs = properties.getFileSystem();
+ this.path = properties.getPath();
+ this.useZeroCopy = properties.getZeroCopy();
+ this.codec = properties.getCodec();
}
@Override
@@ -108,9 +108,8 @@ public class RecordReaderUtils {
}
- static DataReader createDefaultDataReader(
- FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
- return new DefaultDataReader(fs, path, useZeroCopy, codec);
+ static DataReader createDefaultDataReader(DataReaderProperties properties) {
+ return new DefaultDataReader(properties);
}
public static boolean[] findPresentStreamsByColumn(
http://git-wip-us.apache.org/repos/asf/hive/blob/72b6bcd7/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
index 6803abd..cc7182f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
@@ -21,11 +21,18 @@ package org.apache.hadoop.hive.ql.io.orc;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -46,9 +53,17 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.DataReaderFactory;
+import org.apache.orc.MetadataReaderFactory;
+import org.apache.orc.StripeInformation;
import org.apache.orc.impl.ColumnStatisticsImpl;
import org.apache.orc.OrcProto;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.MetadataReader;
+import org.apache.orc.impl.MetadataReaderProperties;
import org.junit.Test;
import org.mockito.MockSettings;
import org.mockito.Mockito;
@@ -148,16 +163,16 @@ public class TestRecordReaderImpl {
footer.writeTo(buffer);
ps.writeTo(buffer);
buffer.write(ps.getSerializedSize());
- FileSystem fs = Mockito.mock(FileSystem.class, settings);
+ FileSystem fs = mock(FileSystem.class, settings);
FSDataInputStream file =
new FSDataInputStream(new BufferInStream(buffer.getData(),
buffer.getLength()));
Path p = new Path("/dir/file.orc");
- Mockito.when(fs.open(p)).thenReturn(file);
+ when(fs.open(p)).thenReturn(file);
OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
options.filesystem(fs);
options.maxLength(buffer.getLength());
- Mockito.when(fs.getFileStatus(p))
+ when(fs.getFileStatus(p))
.thenReturn(new FileStatus(10, false, 3, 3000, 0, p));
Reader reader = OrcFile.createReader(p, options);
}
@@ -165,21 +180,21 @@ public class TestRecordReaderImpl {
@Test
public void testCompareToRangeInt() throws Exception {
assertEquals(Location.BEFORE,
- RecordReaderImpl.compareToRange(19L, 20L, 40L));
+ RecordReaderImpl.compareToRange(19L, 20L, 40L));
assertEquals(Location.AFTER,
- RecordReaderImpl.compareToRange(41L, 20L, 40L));
+ RecordReaderImpl.compareToRange(41L, 20L, 40L));
assertEquals(Location.MIN,
RecordReaderImpl.compareToRange(20L, 20L, 40L));
assertEquals(Location.MIDDLE,
RecordReaderImpl.compareToRange(21L, 20L, 40L));
assertEquals(Location.MAX,
- RecordReaderImpl.compareToRange(40L, 20L, 40L));
+ RecordReaderImpl.compareToRange(40L, 20L, 40L));
assertEquals(Location.BEFORE,
- RecordReaderImpl.compareToRange(0L, 1L, 1L));
+ RecordReaderImpl.compareToRange(0L, 1L, 1L));
assertEquals(Location.MIN,
- RecordReaderImpl.compareToRange(1L, 1L, 1L));
+ RecordReaderImpl.compareToRange(1L, 1L, 1L));
assertEquals(Location.AFTER,
- RecordReaderImpl.compareToRange(2L, 1L, 1L));
+ RecordReaderImpl.compareToRange(2L, 1L, 1L));
}
@Test
@@ -205,43 +220,43 @@ public class TestRecordReaderImpl {
@Test
public void testCompareToCharNeedConvert() throws Exception {
assertEquals(Location.BEFORE,
- RecordReaderImpl.compareToRange("apple", "hello", "world"));
+ RecordReaderImpl.compareToRange("apple", "hello", "world"));
assertEquals(Location.AFTER,
- RecordReaderImpl.compareToRange("zombie", "hello", "world"));
+ RecordReaderImpl.compareToRange("zombie", "hello", "world"));
assertEquals(Location.MIN,
RecordReaderImpl.compareToRange("hello", "hello", "world"));
assertEquals(Location.MIDDLE,
RecordReaderImpl.compareToRange("pilot", "hello", "world"));
assertEquals(Location.MAX,
- RecordReaderImpl.compareToRange("world", "hello", "world"));
+ RecordReaderImpl.compareToRange("world", "hello", "world"));
assertEquals(Location.BEFORE,
- RecordReaderImpl.compareToRange("apple", "hello", "hello"));
+ RecordReaderImpl.compareToRange("apple", "hello", "hello"));
assertEquals(Location.MIN,
- RecordReaderImpl.compareToRange("hello", "hello", "hello"));
+ RecordReaderImpl.compareToRange("hello", "hello", "hello"));
assertEquals(Location.AFTER,
- RecordReaderImpl.compareToRange("zombie", "hello", "hello"));
+ RecordReaderImpl.compareToRange("zombie", "hello", "hello"));
}
@Test
public void testGetMin() throws Exception {
assertEquals(10L, RecordReaderImpl.getMin(
- ColumnStatisticsImpl.deserialize(createIntStats(10L, 100L))));
+ ColumnStatisticsImpl.deserialize(createIntStats(10L, 100L))));
assertEquals(10.0d, RecordReaderImpl.getMin(ColumnStatisticsImpl.deserialize(
- OrcProto.ColumnStatistics.newBuilder()
- .setDoubleStatistics(OrcProto.DoubleStatistics.newBuilder()
- .setMinimum(10.0d).setMaximum(100.0d).build()).build())));
+ OrcProto.ColumnStatistics.newBuilder()
+ .setDoubleStatistics(OrcProto.DoubleStatistics.newBuilder()
+ .setMinimum(10.0d).setMaximum(100.0d).build()).build())));
assertEquals(null, RecordReaderImpl.getMin(ColumnStatisticsImpl.deserialize(
- OrcProto.ColumnStatistics.newBuilder()
- .setStringStatistics(OrcProto.StringStatistics.newBuilder().build())
- .build())));
+ OrcProto.ColumnStatistics.newBuilder()
+ .setStringStatistics(OrcProto.StringStatistics.newBuilder().build())
+ .build())));
assertEquals("a", RecordReaderImpl.getMin(ColumnStatisticsImpl.deserialize(
- OrcProto.ColumnStatistics.newBuilder()
- .setStringStatistics(OrcProto.StringStatistics.newBuilder()
- .setMinimum("a").setMaximum("b").build()).build())));
+ OrcProto.ColumnStatistics.newBuilder()
+ .setStringStatistics(OrcProto.StringStatistics.newBuilder()
+ .setMinimum("a").setMaximum("b").build()).build())));
assertEquals("hello", RecordReaderImpl.getMin(ColumnStatisticsImpl
- .deserialize(createStringStats("hello", "world"))));
+ .deserialize(createStringStats("hello", "world"))));
assertEquals(HiveDecimal.create("111.1"), RecordReaderImpl.getMin(ColumnStatisticsImpl
- .deserialize(createDecimalStats("111.1", "112.1"))));
+ .deserialize(createDecimalStats("111.1", "112.1"))));
}
private static OrcProto.ColumnStatistics createIntStats(Long min,
@@ -262,7 +277,7 @@ public class TestRecordReaderImpl {
OrcProto.BucketStatistics.Builder boolStats = OrcProto.BucketStatistics.newBuilder();
boolStats.addCount(trueCount);
return OrcProto.ColumnStatistics.newBuilder().setNumberOfValues(n).setBucketStatistics(
- boolStats.build()).build();
+ boolStats.build()).build();
}
private static OrcProto.ColumnStatistics createIntStats(int min, int max) {
@@ -341,9 +356,9 @@ public class TestRecordReaderImpl {
.setStringStatistics(OrcProto.StringStatistics.newBuilder()
.setMinimum("a").setMaximum("b").build()).build())));
assertEquals("world", RecordReaderImpl.getMax(ColumnStatisticsImpl
- .deserialize(createStringStats("hello", "world"))));
+ .deserialize(createStringStats("hello", "world"))));
assertEquals(HiveDecimal.create("112.1"), RecordReaderImpl.getMax(ColumnStatisticsImpl
- .deserialize(createDecimalStats("111.1", "112.1"))));
+ .deserialize(createDecimalStats("111.1", "112.1"))));
}
@Test
@@ -365,15 +380,15 @@ public class TestRecordReaderImpl {
pred = TestSearchArgumentImpl.createPredicateLeaf(
PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.BOOLEAN, "x", false, null);
assertEquals(TruthValue.NO,
- RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 10), pred, null));
+ RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 10), pred, null));
assertEquals(TruthValue.YES_NO,
- RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 0), pred, null));
+ RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 0), pred, null));
}
@Test
public void testPredEvalWithIntStats() throws Exception {
PredicateLeaf pred = TestSearchArgumentImpl.createPredicateLeaf(
- PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.LONG, "x", 15L, null);
+ PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.LONG, "x", 15L, null);
assertEquals(TruthValue.YES_NO,
RecordReaderImpl.evaluatePredicateProto(createIntStats(10, 100), pred, null));
@@ -402,7 +417,7 @@ public class TestRecordReaderImpl {
pred = TestSearchArgumentImpl.createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS,
PredicateLeaf.Type.TIMESTAMP, "x", new Timestamp(15), null);
assertEquals(TruthValue.YES_NO,
- RecordReaderImpl.evaluatePredicateProto(createIntStats(10, 100), pred, null));
+ RecordReaderImpl.evaluatePredicateProto(createIntStats(10, 100), pred, null));
}
@Test
@@ -620,7 +635,7 @@ public class TestRecordReaderImpl {
RecordReaderImpl.evaluatePredicateProto(createTimestampStats(10, 100), pred, null));
assertEquals(TruthValue.YES_NO,
RecordReaderImpl.evaluatePredicateProto(createTimestampStats(10 * 24L * 60L * 60L * 1000L,
- 100 * 24L * 60L * 60L * 1000L), pred, null));
+ 100 * 24L * 60L * 60L * 1000L), pred, null));
pred = TestSearchArgumentImpl.createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS,
PredicateLeaf.Type.DECIMAL, "x", new HiveDecimalWritable("15"), null);
@@ -738,9 +753,9 @@ public class TestRecordReaderImpl {
assertEquals(TruthValue.NO_NULL,
RecordReaderImpl.evaluatePredicateProto(createIntStats(0L, 5L), pred, null));
assertEquals(TruthValue.NO_NULL,
- RecordReaderImpl.evaluatePredicateProto(createIntStats(30L, 40L), pred, null));
+ RecordReaderImpl.evaluatePredicateProto(createIntStats(30L, 40L), pred, null));
assertEquals(TruthValue.YES_NO_NULL,
- RecordReaderImpl.evaluatePredicateProto(createIntStats(5L, 15L), pred, null));
+ RecordReaderImpl.evaluatePredicateProto(createIntStats(5L, 15L), pred, null));
assertEquals(TruthValue.YES_NO_NULL,
RecordReaderImpl.evaluatePredicateProto(createIntStats(15L, 25L), pred, null));
assertEquals(TruthValue.YES_NO_NULL,
@@ -876,10 +891,10 @@ public class TestRecordReaderImpl {
assertEquals(TruthValue.YES_NO_NULL, // before & min
RecordReaderImpl.evaluatePredicateProto(createStringStats("f", "g", true), pred, null));
assertEquals(TruthValue.YES_NO_NULL, // before & middle
- RecordReaderImpl.evaluatePredicateProto(createStringStats("e", "g", true), pred, null));
+ RecordReaderImpl.evaluatePredicateProto(createStringStats("e", "g", true), pred, null));
assertEquals(TruthValue.YES_NULL, // min & after
- RecordReaderImpl.evaluatePredicateProto(createStringStats("c", "e", true), pred, null));
+ RecordReaderImpl.evaluatePredicateProto(createStringStats("c", "e", true), pred, null));
assertEquals(TruthValue.YES_NULL, // min & max
RecordReaderImpl.evaluatePredicateProto(createStringStats("c", "f", true), pred, null));
assertEquals(TruthValue.YES_NO_NULL, // min & middle
@@ -1623,4 +1638,56 @@ public class TestRecordReaderImpl {
bf.addString(HiveDecimal.create(15).toString());
assertEquals(TruthValue.YES_NO_NULL, RecordReaderImpl.evaluatePredicate(cs, pred, bf));
}
+
+ @Test
+ public void testClose() throws Exception {
+ DataReader mockedDataReader = mock(DataReader.class);
+ MetadataReader mockedMetadataReader = mock(MetadataReader.class);
+
+ closeMockedRecordReader(mockedDataReader, mockedMetadataReader);
+
+ verify(mockedDataReader, atLeastOnce()).close();
+ verify(mockedMetadataReader, atLeastOnce()).close();
+ }
+
+ @Test
+ public void testCloseWithException() throws Exception {
+ DataReader mockedDataReader = mock(DataReader.class);
+ MetadataReader mockedMetadataReader = mock(MetadataReader.class);
+ doThrow(IOException.class).when(mockedDataReader).close();
+
+ try {
+ closeMockedRecordReader(mockedDataReader, mockedMetadataReader);
+ fail("Exception should have been thrown when Record Reader was closed");
+ } catch (IOException expected) {
+
+ }
+
+ verify(mockedMetadataReader, atLeastOnce()).close();
+ verify(mockedDataReader, atLeastOnce()).close();
+ }
+
+ private void closeMockedRecordReader(DataReader mockedDataReader,
+ MetadataReader mockedMetadataReader) throws IOException {
+ DataReaderFactory mockedDataReaderFactory = mock(DataReaderFactory.class);
+ MetadataReaderFactory mockedMetadataReaderFactory = mock(MetadataReaderFactory.class);
+ when(mockedDataReaderFactory.create(any(DataReaderProperties.class))).thenReturn(mockedDataReader);
+ when(mockedMetadataReaderFactory.create(any(MetadataReaderProperties.class))).thenReturn(mockedMetadataReader);
+
+ RecordReader recordReader = RecordReaderImpl.builder()
+ .withBufferSize(0)
+ .withCodec(mock(CompressionCodec.class))
+ .withConf(mock(Configuration.class))
+ .withFileSystem(mock(FileSystem.class))
+ .withOptions(mock(Reader.Options.class))
+ .withPath(mock(Path.class))
+ .withStrideRate(0)
+ .withStripes(Collections.singletonList(mock(StripeInformation.class)))
+ .withTypes(Collections.singletonList(OrcProto.Type.getDefaultInstance()))
+ .withDataReaderFactory(mockedDataReaderFactory)
+ .withMetadataReaderFactory(mockedMetadataReaderFactory)
+ .build();
+
+ recordReader.close();
+ }
}