You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:23:03 UTC
[25/27] hive git commit: HIVE-11417. Move the ReaderImpl and
RowReaderImpl to the ORC module,
by making shims for the row by row reader. (omalley reviewed by prasanth_j)
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims.java b/orc/src/java/org/apache/orc/impl/HadoopShims.java
index 2980d71..ef7d70f 100644
--- a/orc/src/java/org/apache/orc/impl/HadoopShims.java
+++ b/orc/src/java/org/apache/orc/impl/HadoopShims.java
@@ -18,9 +18,13 @@
package org.apache.orc.impl;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.VersionInfo;
+import java.io.Closeable;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
public interface HadoopShims {
@@ -43,6 +47,81 @@ public interface HadoopShims {
*/
DirectDecompressor getDirectDecompressor(DirectCompressionType codec);
+ /**
+ * a hadoop.io ByteBufferPool shim.
+ */
+ public interface ByteBufferPoolShim {
+ /**
+ * Get a new ByteBuffer from the pool. The pool can provide this from
+ * removing a buffer from its internal cache, or by allocating a
+ * new buffer.
+ *
+ * @param direct Whether the buffer should be direct.
+ * @param length The minimum length the buffer will have.
+ * @return A new ByteBuffer. Its capacity can be less
+ * than what was requested, but must be at
+ * least 1 byte.
+ */
+ ByteBuffer getBuffer(boolean direct, int length);
+
+ /**
+ * Release a buffer back to the pool.
+ * The pool may choose to put this buffer into its cache/free it.
+ *
+ * @param buffer a direct bytebuffer
+ */
+ void putBuffer(ByteBuffer buffer);
+ }
+
+ /**
+ * Provides an HDFS ZeroCopyReader shim.
+ * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+ * @param in ByteBufferPoolShim to allocate fallback buffers with
+ *
+ * @return returns null if not supported
+ */
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+
+ public interface ZeroCopyReaderShim extends Closeable {
+ /**
+ * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ * @return ByteBuffer read from the stream,
+ */
+ public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+ /**
+ * Release a ByteBuffer obtained from a read on the
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ */
+ public void releaseBuffer(ByteBuffer buffer);
+
+ /**
+ * Close the underlying stream.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+ }
+ /**
+ * Read data into a Text object in the fastest way possible
+ */
+ public interface TextReaderShim {
+ /**
+ * @param txt
+ * @param size
+ * @return bytes read
+ * @throws IOException
+ */
+ void read(Text txt, int size) throws IOException;
+ }
+
+ /**
+ * Wrap a TextReaderShim around an input stream. The reader shim will not
+ * buffer any reads from the underlying stream and will only consume bytes
+ * which are required for TextReaderShim.read() input.
+ */
+ public TextReaderShim getTextReaderShim(InputStream input) throws IOException;
class Factory {
private static HadoopShims SHIMS = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
index 3b9371d..5c53f74 100644
--- a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ b/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -18,10 +18,14 @@
package org.apache.orc.impl;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
/**
@@ -59,4 +63,30 @@ public class HadoopShimsCurrent implements HadoopShims {
return null;
}
}
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool
+ ) throws IOException {
+ return ZeroCopyShims.getZeroCopyReader(in, pool);
+ }
+
+ private final class FastTextReaderShim implements TextReaderShim {
+ private final DataInputStream din;
+
+ public FastTextReaderShim(InputStream in) {
+ this.din = new DataInputStream(in);
+ }
+
+ @Override
+ public void read(Text txt, int len) throws IOException {
+ txt.readWithKnownLength(din, len);
+ }
+ }
+
+ @Override
+ public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+ return new FastTextReaderShim(in);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
index ac46836..3f65e74 100644
--- a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
+++ b/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
@@ -18,19 +18,84 @@
package org.apache.orc.impl;
-import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
-import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+import java.io.EOFException;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.io.InputStream;
+import java.lang.reflect.Method;
/**
* Shims for versions of Hadoop up to and including 2.2.x
*/
public class HadoopShims_2_2 implements HadoopShims {
+ final boolean zeroCopy;
+ final boolean fastRead;
+
+ HadoopShims_2_2() {
+ boolean zcr = false;
+ try {
+ Class.forName("org.apache.hadoop.fs.CacheFlag", false,
+ HadoopShims_2_2.class.getClassLoader());
+ zcr = true;
+ } catch (ClassNotFoundException ce) {
+ }
+ zeroCopy = zcr;
+ boolean fastRead = false;
+ if (zcr) {
+ for (Method m : Text.class.getMethods()) {
+ if ("readWithKnownLength".equals(m.getName())) {
+ fastRead = true;
+ }
+ }
+ }
+ this.fastRead = fastRead;
+ }
+
public DirectDecompressor getDirectDecompressor(
DirectCompressionType codec) {
return null;
}
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool
+ ) throws IOException {
+ if(zeroCopy) {
+ return ZeroCopyShims.getZeroCopyReader(in, pool);
+ }
+ /* not supported */
+ return null;
+ }
+
+ private final class BasicTextReaderShim implements TextReaderShim {
+ private final InputStream in;
+
+ public BasicTextReaderShim(InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public void read(Text txt, int len) throws IOException {
+ int offset = 0;
+ byte[] bytes = new byte[len];
+ while (len > 0) {
+ int written = in.read(bytes, offset, len);
+ if (written < 0) {
+ throw new EOFException("Can't finish read from " + in + " read "
+ + (offset) + " bytes out of " + bytes.length);
+ }
+ len -= written;
+ offset += written;
+ }
+ txt.set(bytes);
+ }
+ }
+
+ @Override
+ public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+ return new BasicTextReaderShim(in);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
index 8bef0f1..3e64d54 100644
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -78,4 +78,5 @@ public interface IntegerReader {
void nextVector(ColumnVector column,
int[] data,
int length
- ) throws IOException;}
+ ) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java b/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
new file mode 100644
index 0000000..72c7f54
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.Reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+public class OrcAcidUtils {
+ public static final String ACID_STATS = "hive.acid.stats";
+ public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
+
+ /**
+ * Get the filename of the ORC ACID side file that contains the lengths
+ * of the intermediate footers.
+ * @param main the main ORC filename
+ * @return the name of the side file
+ */
+ public static Path getSideFile(Path main) {
+ return new Path(main + DELTA_SIDE_FILE_SUFFIX);
+ }
+
+ /**
+ * Read the side file to get the last flush length.
+ * @param fs the file system to use
+ * @param deltaFile the path of the delta file
+ * @return the maximum size of the file to use
+ * @throws IOException
+ */
+ public static long getLastFlushLength(FileSystem fs,
+ Path deltaFile) throws IOException {
+ Path lengths = getSideFile(deltaFile);
+ long result = Long.MAX_VALUE;
+ try (FSDataInputStream stream = fs.open(lengths)) {
+ result = -1;
+ while (stream.available() > 0) {
+ result = stream.readLong();
+ }
+ return result;
+ } catch (IOException ioe) {
+ return result;
+ }
+ }
+
+ private static final Charset utf8 = Charset.forName("UTF-8");
+ private static final CharsetDecoder utf8Decoder = utf8.newDecoder();
+
+ public static AcidStats parseAcidStats(Reader reader) {
+ if (reader.hasMetadataValue(ACID_STATS)) {
+ try {
+ ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate();
+ return new AcidStats(utf8Decoder.decode(val).toString());
+ } catch (CharacterCodingException e) {
+ throw new IllegalArgumentException("Bad string encoding for " +
+ ACID_STATS, e);
+ }
+ } else {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
new file mode 100644
index 0000000..2da590e
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -0,0 +1,758 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.FileFormatException;
+import org.apache.orc.FileMetaInfo;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcProto;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.CodedInputStream;
+
+public class ReaderImpl implements Reader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class);
+
+ private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
+
+ protected final FileSystem fileSystem;
+ private final long maxLength;
+ protected final Path path;
+ protected final org.apache.orc.CompressionKind compressionKind;
+ protected final CompressionCodec codec;
+ protected final int bufferSize;
+ private final List<OrcProto.StripeStatistics> stripeStats;
+ private final int metadataSize;
+ protected final List<OrcProto.Type> types;
+ private final TypeDescription schema;
+ private final List<OrcProto.UserMetadataItem> userMetadata;
+ private final List<OrcProto.ColumnStatistics> fileStats;
+ private final List<StripeInformation> stripes;
+ protected final int rowIndexStride;
+ private final long contentLength, numberOfRows;
+
+
+ private long deserializedSize = -1;
+ protected final Configuration conf;
+ private final List<Integer> versionList;
+ private final OrcFile.WriterVersion writerVersion;
+
+ // Same for metastore cache - maintains the same background buffer, but includes postscript.
+ // This will only be set if the file footer/metadata was read from disk.
+ private final ByteBuffer footerMetaAndPsBuffer;
+
+ public static class StripeInformationImpl
+ implements StripeInformation {
+ private final OrcProto.StripeInformation stripe;
+
+ public StripeInformationImpl(OrcProto.StripeInformation stripe) {
+ this.stripe = stripe;
+ }
+
+ @Override
+ public long getOffset() {
+ return stripe.getOffset();
+ }
+
+ @Override
+ public long getLength() {
+ return stripe.getDataLength() + getIndexLength() + getFooterLength();
+ }
+
+ @Override
+ public long getDataLength() {
+ return stripe.getDataLength();
+ }
+
+ @Override
+ public long getFooterLength() {
+ return stripe.getFooterLength();
+ }
+
+ @Override
+ public long getIndexLength() {
+ return stripe.getIndexLength();
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return stripe.getNumberOfRows();
+ }
+
+ @Override
+ public String toString() {
+ return "offset: " + getOffset() + " data: " + getDataLength() +
+ " rows: " + getNumberOfRows() + " tail: " + getFooterLength() +
+ " index: " + getIndexLength();
+ }
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return numberOfRows;
+ }
+
+ @Override
+ public List<String> getMetadataKeys() {
+ List<String> result = new ArrayList<String>();
+ for(OrcProto.UserMetadataItem item: userMetadata) {
+ result.add(item.getName());
+ }
+ return result;
+ }
+
+ @Override
+ public ByteBuffer getMetadataValue(String key) {
+ for(OrcProto.UserMetadataItem item: userMetadata) {
+ if (item.hasName() && item.getName().equals(key)) {
+ return item.getValue().asReadOnlyByteBuffer();
+ }
+ }
+ throw new IllegalArgumentException("Can't find user metadata " + key);
+ }
+
+ public boolean hasMetadataValue(String key) {
+ for(OrcProto.UserMetadataItem item: userMetadata) {
+ if (item.hasName() && item.getName().equals(key)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public org.apache.orc.CompressionKind getCompressionKind() {
+ return compressionKind;
+ }
+
+ @Override
+ public int getCompressionSize() {
+ return bufferSize;
+ }
+
+ @Override
+ public List<StripeInformation> getStripes() {
+ return stripes;
+ }
+
+ @Override
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ @Override
+ public List<OrcProto.Type> getTypes() {
+ return types;
+ }
+
+ @Override
+ public OrcFile.Version getFileVersion() {
+ for (OrcFile.Version version: OrcFile.Version.values()) {
+ if ((versionList != null && !versionList.isEmpty()) &&
+ version.getMajor() == versionList.get(0) &&
+ version.getMinor() == versionList.get(1)) {
+ return version;
+ }
+ }
+ return OrcFile.Version.V_0_11;
+ }
+
+ @Override
+ public OrcFile.WriterVersion getWriterVersion() {
+ return writerVersion;
+ }
+
+ @Override
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ @Override
+ public ColumnStatistics[] getStatistics() {
+ ColumnStatistics[] result = new ColumnStatistics[types.size()];
+ for(int i=0; i < result.length; ++i) {
+ result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i));
+ }
+ return result;
+ }
+
+ @Override
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ /**
+ * Ensure this is an ORC file to prevent users from trying to read text
+ * files or RC files as ORC files.
+ * @param in the file being read
+ * @param path the filename for error messages
+ * @param psLen the postscript length
+ * @param buffer the tail of the file
+ * @throws IOException
+ */
+ protected static void ensureOrcFooter(FSDataInputStream in,
+ Path path,
+ int psLen,
+ ByteBuffer buffer) throws IOException {
+ int magicLength = OrcFile.MAGIC.length();
+ int fullLength = magicLength + 1;
+ if (psLen < fullLength || buffer.remaining() < fullLength) {
+ throw new FileFormatException("Malformed ORC file " + path +
+ ". Invalid postscript length " + psLen);
+ }
+ int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
+ byte[] array = buffer.array();
+ // now look for the magic string at the end of the postscript.
+ if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
+ // If it isn't there, this may be the 0.11.0 version of ORC.
+ // Read the first 3 bytes of the file to check for the header
+ byte[] header = new byte[magicLength];
+ in.readFully(0, header, 0, magicLength);
+ // if it isn't there, this isn't an ORC file
+ if (!Text.decode(header, 0 , magicLength).equals(OrcFile.MAGIC)) {
+ throw new FileFormatException("Malformed ORC file " + path +
+ ". Invalid postscript.");
+ }
+ }
+ }
+
+ /**
+ * Build a version string out of an array.
+ * @param version the version number as a list
+ * @return the human readable form of the version string
+ */
+ private static String versionString(List<Integer> version) {
+ StringBuilder buffer = new StringBuilder();
+ for(int i=0; i < version.size(); ++i) {
+ if (i != 0) {
+ buffer.append('.');
+ }
+ buffer.append(version.get(i));
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * Check to see if this ORC file is from a future version and if so,
+ * warn the user that we may not be able to read all of the column encodings.
+ * @param log the logger to write any error message to
+ * @param path the data source path for error messages
+ * @param version the version of hive that wrote the file.
+ */
+ protected static void checkOrcVersion(Logger log, Path path,
+ List<Integer> version) {
+ if (version.size() >= 1) {
+ int major = version.get(0);
+ int minor = 0;
+ if (version.size() >= 2) {
+ minor = version.get(1);
+ }
+ if (major > OrcFile.Version.CURRENT.getMajor() ||
+ (major == OrcFile.Version.CURRENT.getMajor() &&
+ minor > OrcFile.Version.CURRENT.getMinor())) {
+ log.warn(path + " was written by a future Hive version " +
+ versionString(version) +
+ ". This file may not be readable by this version of Hive.");
+ }
+ }
+ }
+
+ /**
+ * Constructor that let's the user specify additional options.
+ * @param path pathname for file
+ * @param options options for reading
+ * @throws IOException
+ */
+ public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException {
+ FileSystem fs = options.getFilesystem();
+ if (fs == null) {
+ fs = path.getFileSystem(options.getConfiguration());
+ }
+ this.fileSystem = fs;
+ this.path = path;
+ this.conf = options.getConfiguration();
+ this.maxLength = options.getMaxLength();
+
+ FileMetadata fileMetadata = options.getFileMetadata();
+ if (fileMetadata != null) {
+ this.compressionKind = fileMetadata.getCompressionKind();
+ this.bufferSize = fileMetadata.getCompressionBufferSize();
+ this.codec = WriterImpl.createCodec(compressionKind);
+ this.metadataSize = fileMetadata.getMetadataSize();
+ this.stripeStats = fileMetadata.getStripeStats();
+ this.versionList = fileMetadata.getVersionList();
+ this.writerVersion =
+ OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum());
+ this.types = fileMetadata.getTypes();
+ this.rowIndexStride = fileMetadata.getRowIndexStride();
+ this.contentLength = fileMetadata.getContentLength();
+ this.numberOfRows = fileMetadata.getNumberOfRows();
+ this.fileStats = fileMetadata.getFileStats();
+ this.stripes = fileMetadata.getStripes();
+ this.userMetadata = null; // not cached and not needed here
+ this.footerMetaAndPsBuffer = null;
+ } else {
+ FileMetaInfo footerMetaData;
+ if (options.getFileMetaInfo() != null) {
+ footerMetaData = options.getFileMetaInfo();
+ this.footerMetaAndPsBuffer = null;
+ } else {
+ footerMetaData = extractMetaInfoFromFooter(fs, path,
+ options.getMaxLength());
+ this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
+ }
+ MetaInfoObjExtractor rInfo =
+ new MetaInfoObjExtractor(footerMetaData.compressionType,
+ footerMetaData.bufferSize,
+ footerMetaData.metadataSize,
+ footerMetaData.footerBuffer
+ );
+ this.compressionKind = rInfo.compressionKind;
+ this.codec = rInfo.codec;
+ this.bufferSize = rInfo.bufferSize;
+ this.metadataSize = rInfo.metadataSize;
+ this.stripeStats = rInfo.metadata.getStripeStatsList();
+ this.types = rInfo.footer.getTypesList();
+ this.rowIndexStride = rInfo.footer.getRowIndexStride();
+ this.contentLength = rInfo.footer.getContentLength();
+ this.numberOfRows = rInfo.footer.getNumberOfRows();
+ this.userMetadata = rInfo.footer.getMetadataList();
+ this.fileStats = rInfo.footer.getStatisticsList();
+ this.versionList = footerMetaData.versionList;
+ this.writerVersion = footerMetaData.writerVersion;
+ this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList());
+ }
+ this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
+ }
+
+ /**
+ * Get the WriterVersion based on the ORC file postscript.
+ * @param writerVersion the integer writer version
+ * @return the version of the software that produced the file
+ */
+ public static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
+ for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
+ if (version.getId() == writerVersion) {
+ return version;
+ }
+ }
+ return OrcFile.WriterVersion.FUTURE;
+ }
+
+ private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+ int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(footerAbsPos);
+ bb.limit(footerAbsPos + footerSize);
+ return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
+ Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
+ }
+
+ private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+ int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(metadataAbsPos);
+ bb.limit(metadataAbsPos + metadataSize);
+ return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
+ Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
+ }
+
+ private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+ int psLen, int psAbsOffset) throws IOException {
+ // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+ assert bb.hasArray();
+ CodedInputStream in = CodedInputStream.newInstance(
+ bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+ OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+ checkOrcVersion(LOG, path, ps.getVersionList());
+
+ // Check compression codec.
+ switch (ps.getCompression()) {
+ case NONE:
+ break;
+ case ZLIB:
+ break;
+ case SNAPPY:
+ break;
+ case LZO:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+ }
+ return ps;
+ }
+
+ private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
+ Path path,
+ long maxFileLength
+ ) throws IOException {
+ FSDataInputStream file = fs.open(path);
+ ByteBuffer buffer = null, fullFooterBuffer = null;
+ OrcProto.PostScript ps = null;
+ OrcFile.WriterVersion writerVersion = null;
+ try {
+ // figure out the size of the file using the option or filesystem
+ long size;
+ if (maxFileLength == Long.MAX_VALUE) {
+ size = fs.getFileStatus(path).getLen();
+ } else {
+ size = maxFileLength;
+ }
+
+ //read last bytes into buffer to get PostScript
+ int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
+ buffer = ByteBuffer.allocate(readSize);
+ assert buffer.position() == 0;
+ file.readFully((size - readSize),
+ buffer.array(), buffer.arrayOffset(), readSize);
+ buffer.position(0);
+
+ //read the PostScript
+ //get length of PostScript
+ int psLen = buffer.get(readSize - 1) & 0xff;
+ ensureOrcFooter(file, path, psLen, buffer);
+ int psOffset = readSize - 1 - psLen;
+ ps = extractPostScript(buffer, path, psLen, psOffset);
+
+ int footerSize = (int) ps.getFooterLength();
+ int metadataSize = (int) ps.getMetadataLength();
+ writerVersion = extractWriterVersion(ps);
+
+ //check if extra bytes need to be read
+ int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
+ if (extra > 0) {
+ //more bytes need to be read, seek back to the right place and read extra bytes
+ ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
+ file.readFully((size - readSize - extra), extraBuf.array(),
+ extraBuf.arrayOffset() + extraBuf.position(), extra);
+ extraBuf.position(extra);
+ //append with already read bytes
+ extraBuf.put(buffer);
+ buffer = extraBuf;
+ buffer.position(0);
+ fullFooterBuffer = buffer.slice();
+ buffer.limit(footerSize + metadataSize);
+ } else {
+ //footer is already in the bytes in buffer, just adjust position, length
+ buffer.position(psOffset - footerSize - metadataSize);
+ fullFooterBuffer = buffer.slice();
+ buffer.limit(psOffset);
+ }
+
+ // remember position for later TODO: what later? this comment is useless
+ buffer.mark();
+ } finally {
+ try {
+ file.close();
+ } catch (IOException ex) {
+ LOG.error("Failed to close the file after another error", ex);
+ }
+ }
+
+ return new FileMetaInfo(
+ ps.getCompression().toString(),
+ (int) ps.getCompressionBlockSize(),
+ (int) ps.getMetadataLength(),
+ buffer,
+ ps.getVersionList(),
+ writerVersion,
+ fullFooterBuffer
+ );
+ }
+
+ protected static OrcFile.WriterVersion extractWriterVersion(OrcProto.PostScript ps) {
+ return (ps.hasWriterVersion()
+ ? getWriterVersion(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL);
+ }
+
+ protected static List<StripeInformation> convertProtoStripesToStripes(
+ List<OrcProto.StripeInformation> stripes) {
+ List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size());
+ for (OrcProto.StripeInformation info : stripes) {
+ result.add(new StripeInformationImpl(info));
+ }
+ return result;
+ }
+
+ /**
+ * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl
+ * from serialized fields.
+ * As the fields are final, the fields need to be initialized in the constructor and
+ * can't be done in some helper function. So this helper class is used instead.
+ *
+ */
+ private static class MetaInfoObjExtractor{
+ final org.apache.orc.CompressionKind compressionKind;
+ final CompressionCodec codec;
+ final int bufferSize;
+ final int metadataSize;
+ final OrcProto.Metadata metadata;
+ final OrcProto.Footer footer;
+
+ MetaInfoObjExtractor(String codecStr, int bufferSize, int metadataSize,
+ ByteBuffer footerBuffer) throws IOException {
+
+ this.compressionKind = org.apache.orc.CompressionKind.valueOf(codecStr.toUpperCase());
+ this.bufferSize = bufferSize;
+ this.codec = WriterImpl.createCodec(compressionKind);
+ this.metadataSize = metadataSize;
+
+ int position = footerBuffer.position();
+ int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
+
+ this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize);
+ this.footer = extractFooter(
+ footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize);
+
+ footerBuffer.position(position);
+ }
+ }
+
+ @Override
+ public ByteBuffer getSerializedFileFooter() {
+ return footerMetaAndPsBuffer;
+ }
+
+ @Override
+ public RecordReader rows() throws IOException {
+ return rows(new Options());
+ }
+
+ @Override
+ public RecordReader rows(Options options) throws IOException {
+ LOG.info("Reading ORC rows from " + path + " with " + options);
+ boolean[] include = options.getInclude();
+ // if included columns is null, then include all columns
+ if (include == null) {
+ include = new boolean[types.size()];
+ Arrays.fill(include, true);
+ options.include(include);
+ }
+ return new RecordReaderImpl(this, options);
+ }
+
+
+ @Override
+ public long getRawDataSize() {
+ // if the deserializedSize is not computed, then compute it, else
+ // return the already computed size. since we are reading from the footer
+ // we don't have to compute deserialized size repeatedly
+ if (deserializedSize == -1) {
+ List<Integer> indices = Lists.newArrayList();
+ for (int i = 0; i < fileStats.size(); ++i) {
+ indices.add(i);
+ }
+ deserializedSize = getRawDataSizeFromColIndices(indices);
+ }
+ return deserializedSize;
+ }
+
+ @Override
+ public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+ return getRawDataSizeFromColIndices(colIndices, types, fileStats);
+ }
+
+ public static long getRawDataSizeFromColIndices(
+ List<Integer> colIndices, List<OrcProto.Type> types,
+ List<OrcProto.ColumnStatistics> stats) {
+ long result = 0;
+ for (int colIdx : colIndices) {
+ result += getRawDataSizeOfColumn(colIdx, types, stats);
+ }
+ return result;
+ }
+
+ private static long getRawDataSizeOfColumn(int colIdx, List<OrcProto.Type> types,
+ List<OrcProto.ColumnStatistics> stats) {
+ OrcProto.ColumnStatistics colStat = stats.get(colIdx);
+ long numVals = colStat.getNumberOfValues();
+ OrcProto.Type type = types.get(colIdx);
+
+ switch (type.getKind()) {
+ case BINARY:
+ // old orc format doesn't support binary statistics. checking for binary
+ // statistics is not required as protocol buffers takes care of it.
+ return colStat.getBinaryStatistics().getSum();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ // old orc format doesn't support sum for string statistics. checking for
+ // existence is not required as protocol buffers takes care of it.
+
+ // ORC strings are deserialized to java strings. so use java data model's
+ // string size
+ numVals = numVals == 0 ? 1 : numVals;
+ int avgStrLen = (int) (colStat.getStringStatistics().getSum() / numVals);
+ return numVals * JavaDataModel.get().lengthForStringOfLength(avgStrLen);
+ case TIMESTAMP:
+ return numVals * JavaDataModel.get().lengthOfTimestamp();
+ case DATE:
+ return numVals * JavaDataModel.get().lengthOfDate();
+ case DECIMAL:
+ return numVals * JavaDataModel.get().lengthOfDecimal();
+ case DOUBLE:
+ case LONG:
+ return numVals * JavaDataModel.get().primitive2();
+ case FLOAT:
+ case INT:
+ case SHORT:
+ case BOOLEAN:
+ case BYTE:
+ return numVals * JavaDataModel.get().primitive1();
+ default:
+ LOG.debug("Unknown primitive category: " + type.getKind());
+ break;
+ }
+
+ return 0;
+ }
+
+ @Override
+ public long getRawDataSizeOfColumns(List<String> colNames) {
+ List<Integer> colIndices = getColumnIndicesFromNames(colNames);
+ return getRawDataSizeFromColIndices(colIndices);
+ }
+
+ private List<Integer> getColumnIndicesFromNames(List<String> colNames) {
+ // top level struct
+ OrcProto.Type type = types.get(0);
+ List<Integer> colIndices = Lists.newArrayList();
+ List<String> fieldNames = type.getFieldNamesList();
+ int fieldIdx;
+ for (String colName : colNames) {
+ if (fieldNames.contains(colName)) {
+ fieldIdx = fieldNames.indexOf(colName);
+ } else {
+ String s = "Cannot find field for: " + colName + " in ";
+ for (String fn : fieldNames) {
+ s += fn + ", ";
+ }
+ LOG.warn(s);
+ continue;
+ }
+
+ // a single field may span multiple columns. find start and end column
+ // index for the requested field
+ int idxStart = type.getSubtypes(fieldIdx);
+
+ int idxEnd;
+
+ // if the specified is the last field and then end index will be last
+ // column index
+ if (fieldIdx + 1 > fieldNames.size() - 1) {
+ idxEnd = getLastIdx() + 1;
+ } else {
+ idxEnd = type.getSubtypes(fieldIdx + 1);
+ }
+
+ // if start index and end index are same then the field is a primitive
+ // field else complex field (like map, list, struct, union)
+ if (idxStart == idxEnd) {
+ // simple field
+ colIndices.add(idxStart);
+ } else {
+ // complex fields spans multiple columns
+ for (int i = idxStart; i < idxEnd; i++) {
+ colIndices.add(i);
+ }
+ }
+ }
+ return colIndices;
+ }
+
+ private int getLastIdx() {
+ Set<Integer> indices = new HashSet<>();
+ for (OrcProto.Type type : types) {
+ indices.addAll(type.getSubtypesList());
+ }
+ return Collections.max(indices);
+ }
+
+ @Override
+ public List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
+ return stripeStats;
+ }
+
+ @Override
+ public List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics() {
+ return fileStats;
+ }
+
+ @Override
+ public List<StripeStatistics> getStripeStatistics() {
+ List<StripeStatistics> result = new ArrayList<>();
+ for (OrcProto.StripeStatistics ss : stripeStats) {
+ result.add(new StripeStatistics(ss.getColStatsList()));
+ }
+ return result;
+ }
+
+ public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() {
+ return userMetadata;
+ }
+
+ @Override
+ public List<Integer> getVersionList() {
+ return versionList;
+ }
+
+ @Override
+ public int getMetadataSize() {
+ return metadataSize;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("ORC Reader(");
+ buffer.append(path);
+ if (maxLength != -1) {
+ buffer.append(", ");
+ buffer.append(maxLength);
+ }
+ buffer.append(")");
+ return buffer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
new file mode 100644
index 0000000..36a802e
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -0,0 +1,1215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcConf;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TimestampColumnStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.BloomFilterIO;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.ql.util.TimestampUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcProto;
+
+public class RecordReaderImpl implements RecordReader {
+ static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
+ private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
+ private static final Object UNKNOWN_VALUE = new Object();
+ protected final Path path;
+ private final long firstRow;
+ private final List<StripeInformation> stripes =
+ new ArrayList<StripeInformation>();
+ private OrcProto.StripeFooter stripeFooter;
+ private final long totalRowCount;
+ private final CompressionCodec codec;
+ protected final TypeDescription schema;
+ private final List<OrcProto.Type> types;
+ private final int bufferSize;
+ private final boolean[] included;
+ private final long rowIndexStride;
+ private long rowInStripe = 0;
+ private int currentStripe = -1;
+ private long rowBaseInStripe = 0;
+ private long rowCountInStripe = 0;
+ private final Map<StreamName, InStream> streams =
+ new HashMap<StreamName, InStream>();
+ DiskRangeList bufferChunks = null;
+ private final TreeReaderFactory.TreeReader reader;
+ private final OrcProto.RowIndex[] indexes;
+ private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+ private final SargApplier sargApp;
+ // an array about which row groups aren't skipped
+ private boolean[] includedRowGroups = null;
+ private final DataReader dataReader;
+
+ /**
+ * Given a list of column names, find the given column and return the index.
+ *
+ * @param columnNames the list of potential column names
+ * @param columnName the column name to look for
+ * @param rootColumn offset the result with the rootColumn
+ * @return the column number or -1 if the column wasn't found
+ */
+ static int findColumns(String[] columnNames,
+ String columnName,
+ int rootColumn) {
+ for(int i=0; i < columnNames.length; ++i) {
+ if (columnName.equals(columnNames[i])) {
+ return i + rootColumn;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Find the mapping from predicate leaves to columns.
+ * @param sargLeaves the search argument that we need to map
+ * @param columnNames the names of the columns
+ * @param rootColumn the offset of the top level row, which offsets the
+ * result
+ * @return an array mapping the sarg leaves to concrete column numbers
+ */
+ public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
+ String[] columnNames,
+ int rootColumn) {
+ int[] result = new int[sargLeaves.size()];
+ Arrays.fill(result, -1);
+ for(int i=0; i < result.length; ++i) {
+ String colName = sargLeaves.get(i).getColumnName();
+ result[i] = findColumns(columnNames, colName, rootColumn);
+ }
+ return result;
+ }
+
+ protected RecordReaderImpl(ReaderImpl fileReader,
+ Reader.Options options) throws IOException {
+ SchemaEvolution treeReaderSchema;
+ this.included = options.getInclude();
+ included[0] = true;
+ if (options.getSchema() == null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Schema on read not provided -- using file schema " +
+ fileReader.getSchema());
+ }
+ treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
+ } else {
+
+ // Now that we are creating a record reader for a file, validate that the schema to read
+ // is compatible with the file schema.
+ //
+ treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
+ options.getSchema(),included);
+ }
+ this.schema = treeReaderSchema.getReaderSchema();
+ this.path = fileReader.path;
+ this.codec = fileReader.codec;
+ this.types = fileReader.types;
+ this.bufferSize = fileReader.bufferSize;
+ this.rowIndexStride = fileReader.rowIndexStride;
+ SearchArgument sarg = options.getSearchArgument();
+ if (sarg != null && rowIndexStride != 0) {
+ sargApp = new SargApplier(
+ sarg, options.getColumnNames(), rowIndexStride, types,
+ included.length);
+ } else {
+ sargApp = null;
+ }
+ long rows = 0;
+ long skippedRows = 0;
+ long offset = options.getOffset();
+ long maxOffset = options.getMaxOffset();
+ for(StripeInformation stripe: fileReader.getStripes()) {
+ long stripeStart = stripe.getOffset();
+ if (offset > stripeStart) {
+ skippedRows += stripe.getNumberOfRows();
+ } else if (stripeStart < maxOffset) {
+ this.stripes.add(stripe);
+ rows += stripe.getNumberOfRows();
+ }
+ }
+
+ Boolean zeroCopy = options.getUseZeroCopy();
+ if (zeroCopy == null) {
+ zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
+ }
+ if (options.getDataReader() != null) {
+ this.dataReader = options.getDataReader();
+ } else {
+ this.dataReader = RecordReaderUtils.createDefaultDataReader(
+ DataReaderProperties.builder()
+ .withBufferSize(bufferSize)
+ .withCompression(fileReader.compressionKind)
+ .withFileSystem(fileReader.fileSystem)
+ .withPath(fileReader.path)
+ .withTypeCount(types.size())
+ .withZeroCopy(zeroCopy)
+ .build());
+ }
+ this.dataReader.open();
+
+ firstRow = skippedRows;
+ totalRowCount = rows;
+ Boolean skipCorrupt = options.getSkipCorruptRecords();
+ if (skipCorrupt == null) {
+ skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
+ }
+
+ reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
+ treeReaderSchema, included, skipCorrupt);
+ indexes = new OrcProto.RowIndex[types.size()];
+ bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+ advanceToNextRow(reader, 0L, true);
+ }
+
+ public static final class PositionProviderImpl implements PositionProvider {
+ private final OrcProto.RowIndexEntry entry;
+ private int index;
+
+ public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+ this(entry, 0);
+ }
+
+ public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
+ this.entry = entry;
+ this.index = startPos;
+ }
+
+ @Override
+ public long getNext() {
+ return entry.getPositions(index++);
+ }
+ }
+
+ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
+ ) throws IOException {
+ return dataReader.readStripeFooter(stripe);
+ }
+
+ enum Location {
+ BEFORE, MIN, MIDDLE, MAX, AFTER
+ }
+
+ /**
+ * Given a point and min and max, determine if the point is before, at the
+ * min, in the middle, at the max, or after the range.
+ * @param point the point to test
+ * @param min the minimum point
+ * @param max the maximum point
+ * @param <T> the type of the comparision
+ * @return the location of the point
+ */
+ static <T> Location compareToRange(Comparable<T> point, T min, T max) {
+ int minCompare = point.compareTo(min);
+ if (minCompare < 0) {
+ return Location.BEFORE;
+ } else if (minCompare == 0) {
+ return Location.MIN;
+ }
+ int maxCompare = point.compareTo(max);
+ if (maxCompare > 0) {
+ return Location.AFTER;
+ } else if (maxCompare == 0) {
+ return Location.MAX;
+ }
+ return Location.MIDDLE;
+ }
+
+ /**
+ * Get the maximum value out of an index entry.
+ * @param index
+ * the index entry
+ * @return the object for the maximum value or null if there isn't one
+ */
+ static Object getMax(ColumnStatistics index) {
+ if (index instanceof IntegerColumnStatistics) {
+ return ((IntegerColumnStatistics) index).getMaximum();
+ } else if (index instanceof DoubleColumnStatistics) {
+ return ((DoubleColumnStatistics) index).getMaximum();
+ } else if (index instanceof StringColumnStatistics) {
+ return ((StringColumnStatistics) index).getMaximum();
+ } else if (index instanceof DateColumnStatistics) {
+ return ((DateColumnStatistics) index).getMaximum();
+ } else if (index instanceof DecimalColumnStatistics) {
+ return ((DecimalColumnStatistics) index).getMaximum();
+ } else if (index instanceof TimestampColumnStatistics) {
+ return ((TimestampColumnStatistics) index).getMaximum();
+ } else if (index instanceof BooleanColumnStatistics) {
+ if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get the minimum value out of an index entry.
+ * @param index
+ * the index entry
+ * @return the object for the minimum value or null if there isn't one
+ */
+ static Object getMin(ColumnStatistics index) {
+ if (index instanceof IntegerColumnStatistics) {
+ return ((IntegerColumnStatistics) index).getMinimum();
+ } else if (index instanceof DoubleColumnStatistics) {
+ return ((DoubleColumnStatistics) index).getMinimum();
+ } else if (index instanceof StringColumnStatistics) {
+ return ((StringColumnStatistics) index).getMinimum();
+ } else if (index instanceof DateColumnStatistics) {
+ return ((DateColumnStatistics) index).getMinimum();
+ } else if (index instanceof DecimalColumnStatistics) {
+ return ((DecimalColumnStatistics) index).getMinimum();
+ } else if (index instanceof TimestampColumnStatistics) {
+ return ((TimestampColumnStatistics) index).getMinimum();
+ } else if (index instanceof BooleanColumnStatistics) {
+ if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
+ return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
+ }
+ } else {
+ return UNKNOWN_VALUE; // null is not safe here
+ }
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
+ * that is referenced in the predicate.
+ * @param statsProto the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @param bloomFilter
+ * @return the set of truth values that may be returned for the given
+ * predicate.
+ */
+ static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
+ PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
+ ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
+ Object minValue = getMin(cs);
+ Object maxValue = getMax(cs);
+ BloomFilterIO bf = null;
+ if (bloomFilter != null) {
+ bf = new BloomFilterIO(bloomFilter);
+ }
+ return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
+ * that is referenced in the predicate.
+ * @param stats the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @return the set of truth values that may be returned for the given
+ * predicate.
+ */
+ public static TruthValue evaluatePredicate(ColumnStatistics stats,
+ PredicateLeaf predicate,
+ BloomFilterIO bloomFilter) {
+ Object minValue = getMin(stats);
+ Object maxValue = getMax(stats);
+ return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
+ }
+
+ static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
+ Object max, boolean hasNull, BloomFilterIO bloomFilter) {
+ // if we didn't have any values, everything must have been null
+ if (min == null) {
+ if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+ return TruthValue.YES;
+ } else {
+ return TruthValue.NULL;
+ }
+ } else if (min == UNKNOWN_VALUE) {
+ return TruthValue.YES_NO_NULL;
+ }
+
+ TruthValue result;
+ Object baseObj = predicate.getLiteral();
+ try {
+ // Predicate object and stats objects are converted to the type of the predicate object.
+ Object minValue = getBaseObjectForComparison(predicate.getType(), min);
+ Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
+ Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
+
+ result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
+ if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
+ result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
+ }
+ // in case failed conversion, return the default YES_NO_NULL truth value
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ final String statsType = min == null ?
+ (max == null ? "null" : max.getClass().getSimpleName()) :
+ min.getClass().getSimpleName();
+ final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName();
+ final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
+ " Skipping ORC PPD." +
+ " Exception: " + e.getMessage() +
+ " StatsType: " + statsType +
+ " PredicateType: " + predicateType;
+ LOG.warn(reason);
+ LOG.debug(reason, e);
+ }
+ if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) {
+ result = TruthValue.YES_NO;
+ } else {
+ result = TruthValue.YES_NO_NULL;
+ }
+ }
+ return result;
+ }
+
+ private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
+ TruthValue result, BloomFilterIO bloomFilter) {
+ // evaluate bloom filter only when
+ // 1) Bloom filter is available
+ // 2) Min/Max evaluation yield YES or MAYBE
+ // 3) Predicate is EQUALS or IN list
+ if (bloomFilter != null
+ && result != TruthValue.NO_NULL && result != TruthValue.NO
+ && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
+ || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
+ return true;
+ }
+ return false;
+ }
+
+ private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
+ Object minValue,
+ Object maxValue,
+ boolean hasNull) {
+ Location loc;
+
+ switch (predicate.getOperator()) {
+ case NULL_SAFE_EQUALS:
+ loc = compareToRange((Comparable) predObj, minValue, maxValue);
+ if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return TruthValue.NO;
+ } else {
+ return TruthValue.YES_NO;
+ }
+ case EQUALS:
+ loc = compareToRange((Comparable) predObj, minValue, maxValue);
+ if (minValue.equals(maxValue) && loc == Location.MIN) {
+ return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+ } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ } else {
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ case LESS_THAN:
+ loc = compareToRange((Comparable) predObj, minValue, maxValue);
+ if (loc == Location.AFTER) {
+ return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+ } else if (loc == Location.BEFORE || loc == Location.MIN) {
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ } else {
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ case LESS_THAN_EQUALS:
+ loc = compareToRange((Comparable) predObj, minValue, maxValue);
+ if (loc == Location.AFTER || loc == Location.MAX) {
+ return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+ } else if (loc == Location.BEFORE) {
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ } else {
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ case IN:
+ if (minValue.equals(maxValue)) {
+ // for a single value, look through to see if that value is in the
+ // set
+ for (Object arg : predicate.getLiteralList()) {
+ predObj = getBaseObjectForComparison(predicate.getType(), arg);
+ loc = compareToRange((Comparable) predObj, minValue, maxValue);
+ if (loc == Location.MIN) {
+ return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+ }
+ }
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ } else {
+ // are all of the values outside of the range?
+ for (Object arg : predicate.getLiteralList()) {
+ predObj = getBaseObjectForComparison(predicate.getType(), arg);
+ loc = compareToRange((Comparable) predObj, minValue, maxValue);
+ if (loc == Location.MIN || loc == Location.MIDDLE ||
+ loc == Location.MAX) {
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ }
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ }
+ case BETWEEN:
+ List<Object> args = predicate.getLiteralList();
+ Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));
+
+ loc = compareToRange((Comparable) predObj1, minValue, maxValue);
+ if (loc == Location.BEFORE || loc == Location.MIN) {
+ Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
+
+ Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
+ if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+ return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+ } else if (loc2 == Location.BEFORE) {
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ } else {
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ } else if (loc == Location.AFTER) {
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ } else {
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ case IS_NULL:
+ // min = null condition above handles the all-nulls YES case
+ return hasNull ? TruthValue.YES_NO : TruthValue.NO;
+ default:
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ }
+
+ private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
+ final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
+ switch (predicate.getOperator()) {
+ case NULL_SAFE_EQUALS:
+ // null safe equals does not return *_NULL variant. So set hasNull to false
+ return checkInBloomFilter(bloomFilter, predObj, false);
+ case EQUALS:
+ return checkInBloomFilter(bloomFilter, predObj, hasNull);
+ case IN:
+ for (Object arg : predicate.getLiteralList()) {
+ // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
+ Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
+ TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
+ if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
+ return result;
+ }
+ }
+ return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+ default:
+ return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ }
+ }
+
+ private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
+ TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+
+ if (predObj instanceof Long) {
+ if (bf.testLong(((Long) predObj).longValue())) {
+ result = TruthValue.YES_NO_NULL;
+ }
+ } else if (predObj instanceof Double) {
+ if (bf.testDouble(((Double) predObj).doubleValue())) {
+ result = TruthValue.YES_NO_NULL;
+ }
+ } else if (predObj instanceof String || predObj instanceof Text ||
+ predObj instanceof HiveDecimalWritable ||
+ predObj instanceof BigDecimal) {
+ if (bf.testString(predObj.toString())) {
+ result = TruthValue.YES_NO_NULL;
+ }
+ } else if (predObj instanceof Timestamp) {
+ if (bf.testLong(((Timestamp) predObj).getTime())) {
+ result = TruthValue.YES_NO_NULL;
+ }
+ } else if (predObj instanceof Date) {
+ if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
+ result = TruthValue.YES_NO_NULL;
+ }
+ } else {
+ // if the predicate object is null and if hasNull says there are no nulls then return NO
+ if (predObj == null && !hasNull) {
+ result = TruthValue.NO;
+ } else {
+ result = TruthValue.YES_NO_NULL;
+ }
+ }
+
+ if (result == TruthValue.YES_NO_NULL && !hasNull) {
+ result = TruthValue.YES_NO;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Bloom filter evaluation: " + result.toString());
+ }
+
+ return result;
+ }
+
+ private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) {
+ if (obj == null) {
+ return null;
+ }
+ switch (type) {
+ case BOOLEAN:
+ if (obj instanceof Boolean) {
+ return obj;
+ } else {
+ // will only be true if the string conversion yields "true", all other values are
+ // considered false
+ return Boolean.valueOf(obj.toString());
+ }
+ case DATE:
+ if (obj instanceof Date) {
+ return obj;
+ } else if (obj instanceof String) {
+ return Date.valueOf((String) obj);
+ } else if (obj instanceof Timestamp) {
+ return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
+ }
+ // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
+ break;
+ case DECIMAL:
+ if (obj instanceof Boolean) {
+ return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
+ HiveDecimal.ONE : HiveDecimal.ZERO);
+ } else if (obj instanceof Integer) {
+ return new HiveDecimalWritable(((Integer) obj).intValue());
+ } else if (obj instanceof Long) {
+ return new HiveDecimalWritable(((Long) obj));
+ } else if (obj instanceof Float || obj instanceof Double ||
+ obj instanceof String) {
+ return new HiveDecimalWritable(obj.toString());
+ } else if (obj instanceof BigDecimal) {
+ return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
+ } else if (obj instanceof HiveDecimal) {
+ return new HiveDecimalWritable((HiveDecimal) obj);
+ } else if (obj instanceof HiveDecimalWritable) {
+ return obj;
+ } else if (obj instanceof Timestamp) {
+ return new HiveDecimalWritable(Double.toString(
+ TimestampUtils.getDouble((Timestamp) obj)));
+ }
+ break;
+ case FLOAT:
+ if (obj instanceof Number) {
+ // widening conversion
+ return ((Number) obj).doubleValue();
+ } else if (obj instanceof HiveDecimal) {
+ return ((HiveDecimal) obj).doubleValue();
+ } else if (obj instanceof String) {
+ return Double.valueOf(obj.toString());
+ } else if (obj instanceof Timestamp) {
+ return TimestampUtils.getDouble((Timestamp) obj);
+ } else if (obj instanceof HiveDecimal) {
+ return ((HiveDecimal) obj).doubleValue();
+ } else if (obj instanceof BigDecimal) {
+ return ((BigDecimal) obj).doubleValue();
+ }
+ break;
+ case LONG:
+ if (obj instanceof Number) {
+ // widening conversion
+ return ((Number) obj).longValue();
+ } else if (obj instanceof HiveDecimal) {
+ return ((HiveDecimal) obj).longValue();
+ } else if (obj instanceof String) {
+ return Long.valueOf(obj.toString());
+ }
+ break;
+ case STRING:
+ if (obj != null) {
+ return (obj.toString());
+ }
+ break;
+ case TIMESTAMP:
+ if (obj instanceof Timestamp) {
+ return obj;
+ } else if (obj instanceof Integer) {
+ return new Timestamp(((Number) obj).longValue());
+ } else if (obj instanceof Float) {
+ return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue());
+ } else if (obj instanceof Double) {
+ return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue());
+ } else if (obj instanceof HiveDecimal) {
+ return TimestampUtils.decimalToTimestamp((HiveDecimal) obj);
+ } else if (obj instanceof HiveDecimalWritable) {
+ return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
+ } else if (obj instanceof Date) {
+ return new Timestamp(((Date) obj).getTime());
+ }
+ // float/double conversion to timestamp is interpreted as seconds whereas integer conversion
+ // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
+ // is also config driven. The filter operator changes its promotion based on config:
+ // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
+ break;
+ default:
+ break;
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass()
+ .getSimpleName(), type));
+ }
+
+ public static class SargApplier {
+ public final static boolean[] READ_ALL_RGS = null;
+ public final static boolean[] READ_NO_RGS = new boolean[0];
+
+ private final SearchArgument sarg;
+ private final List<PredicateLeaf> sargLeaves;
+ private final int[] filterColumns;
+ private final long rowIndexStride;
+ // same as the above array, but indices are set to true
+ private final boolean[] sargColumns;
+
+ public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
+ List<OrcProto.Type> types, int includedCount) {
+ this.sarg = sarg;
+ sargLeaves = sarg.getLeaves();
+ filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0);
+ this.rowIndexStride = rowIndexStride;
+ // included will not be null, row options will fill the array with trues if null
+ sargColumns = new boolean[includedCount];
+ for (int i : filterColumns) {
+ // filter columns may have -1 as index which could be partition column in SARG.
+ if (i > 0) {
+ sargColumns[i] = true;
+ }
+ }
+ }
+
+ /**
+ * Pick the row groups that we need to load from the current stripe.
+ *
+ * @return an array with a boolean for each row group or null if all of the
+ * row groups must be read.
+ * @throws IOException
+ */
+ public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
+ OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
+ long rowsInStripe = stripe.getNumberOfRows();
+ int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+ boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
+ TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+ boolean hasSelected = false, hasSkipped = false;
+ for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
+ for (int pred = 0; pred < leafValues.length; ++pred) {
+ int columnIx = filterColumns[pred];
+ if (columnIx != -1) {
+ if (indexes[columnIx] == null) {
+ throw new AssertionError("Index is not populated for " + columnIx);
+ }
+ OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
+ if (entry == null) {
+ throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
+ }
+ OrcProto.ColumnStatistics stats = entry.getStatistics();
+ OrcProto.BloomFilter bf = null;
+ if (bloomFilterIndices != null && bloomFilterIndices[filterColumns[pred]] != null) {
+ bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
+ }
+ leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stats = " + stats);
+ LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
+ }
+ } else {
+ // the column is a virtual column
+ leafValues[pred] = TruthValue.YES_NO_NULL;
+ }
+ }
+ result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
+ hasSelected = hasSelected || result[rowGroup];
+ hasSkipped = hasSkipped || (!result[rowGroup]);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+ (rowIndexStride * (rowGroup + 1) - 1) + " is " +
+ (result[rowGroup] ? "" : "not ") + "included.");
+ }
+ }
+
+ return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
+ }
+ }
+
+ /**
+ * Pick the row groups that we need to load from the current stripe.
+ *
+ * @return an array with a boolean for each row group or null if all of the
+ * row groups must be read.
+ * @throws IOException
+ */
+ protected boolean[] pickRowGroups() throws IOException {
+ // if we don't have a sarg or indexes, we read everything
+ if (sargApp == null) {
+ return null;
+ }
+ readRowIndex(currentStripe, included, sargApp.sargColumns);
+ return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
+ }
+
+ private void clearStreams() {
+ // explicit close of all streams to de-ref ByteBuffers
+ for (InStream is : streams.values()) {
+ is.close();
+ }
+ if (bufferChunks != null) {
+ if (dataReader.isTrackingDiskRanges()) {
+ for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
+ if (!(range instanceof BufferChunk)) {
+ continue;
+ }
+ dataReader.releaseBuffer(((BufferChunk) range).getChunk());
+ }
+ }
+ }
+ bufferChunks = null;
+ streams.clear();
+ }
+
+ /**
+ * Read the current stripe into memory.
+ *
+ * @throws IOException
+ */
+ private void readStripe() throws IOException {
+ StripeInformation stripe = beginReadStripe();
+ includedRowGroups = pickRowGroups();
+
+ // move forward to the first unskipped row
+ if (includedRowGroups != null) {
+ while (rowInStripe < rowCountInStripe &&
+ !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
+ rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
+ }
+ }
+
+ // if we haven't skipped the whole stripe, read the data
+ if (rowInStripe < rowCountInStripe) {
+ // if we aren't projecting columns or filtering rows, just read it all
+ if (included == null && includedRowGroups == null) {
+ readAllDataStreams(stripe);
+ } else {
+ readPartialDataStreams(stripe);
+ }
+ reader.startStripe(streams, stripeFooter);
+ // if we skipped the first row group, move the pointers forward
+ if (rowInStripe != 0) {
+ seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
+ }
+ }
+ }
+
+ private StripeInformation beginReadStripe() throws IOException {
+ StripeInformation stripe = stripes.get(currentStripe);
+ stripeFooter = readStripeFooter(stripe);
+ clearStreams();
+ // setup the position in the stripe
+ rowCountInStripe = stripe.getNumberOfRows();
+ rowInStripe = 0;
+ rowBaseInStripe = 0;
+ for (int i = 0; i < currentStripe; ++i) {
+ rowBaseInStripe += stripes.get(i).getNumberOfRows();
+ }
+ // reset all of the indexes
+ for (int i = 0; i < indexes.length; ++i) {
+ indexes[i] = null;
+ }
+ return stripe;
+ }
+
+ private void readAllDataStreams(StripeInformation stripe) throws IOException {
+ long start = stripe.getIndexLength();
+ long end = start + stripe.getDataLength();
+ // explicitly trigger 1 big read
+ DiskRangeList toRead = new DiskRangeList(start, end);
+ bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+ List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
+ createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+ }
+
+ /**
+ * Plan the ranges of the file that we need to read given the list of
+ * columns and row groups.
+ *
+ * @param streamList the list of streams available
+ * @param indexes the indexes that have been loaded
+ * @param includedColumns which columns are needed
+ * @param includedRowGroups which row groups are needed
+ * @param isCompressed does the file have generic compression
+ * @param encodings the encodings for each column
+ * @param types the types of the columns
+ * @param compressionSize the compression block size
+ * @return the list of disk ranges that will be loaded
+ */
+ static DiskRangeList planReadPartialDataStreams
+ (List<OrcProto.Stream> streamList,
+ OrcProto.RowIndex[] indexes,
+ boolean[] includedColumns,
+ boolean[] includedRowGroups,
+ boolean isCompressed,
+ List<OrcProto.ColumnEncoding> encodings,
+ List<OrcProto.Type> types,
+ int compressionSize,
+ boolean doMergeBuffers) {
+ long offset = 0;
+ // figure out which columns have a present stream
+ boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
+ CreateHelper list = new CreateHelper();
+ for (OrcProto.Stream stream : streamList) {
+ long length = stream.getLength();
+ int column = stream.getColumn();
+ OrcProto.Stream.Kind streamKind = stream.getKind();
+ // since stream kind is optional, first check if it exists
+ if (stream.hasKind() &&
+ (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
+ (column < includedColumns.length && includedColumns[column])) {
+ // if we aren't filtering or it is a dictionary, load it.
+ if (includedRowGroups == null
+ || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
+ RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
+ } else {
+ RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
+ isCompressed, indexes[column], encodings.get(column), types.get(column),
+ compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
+ }
+ }
+ offset += length;
+ }
+ return list.extract();
+ }
+
+ void createStreams(List<OrcProto.Stream> streamDescriptions,
+ DiskRangeList ranges,
+ boolean[] includeColumn,
+ CompressionCodec codec,
+ int bufferSize,
+ Map<StreamName, InStream> streams) throws IOException {
+ long streamOffset = 0;
+ for (OrcProto.Stream streamDesc : streamDescriptions) {
+ int column = streamDesc.getColumn();
+ if ((includeColumn != null &&
+ (column < included.length && !includeColumn[column])) ||
+ streamDesc.hasKind() &&
+ (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
+ streamOffset += streamDesc.getLength();
+ continue;
+ }
+ List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+ ranges, streamOffset, streamDesc.getLength());
+ StreamName name = new StreamName(column, streamDesc.getKind());
+ streams.put(name, InStream.create(name.toString(), buffers,
+ streamDesc.getLength(), codec, bufferSize));
+ streamOffset += streamDesc.getLength();
+ }
+ }
+
+ private void readPartialDataStreams(StripeInformation stripe) throws IOException {
+ List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+ DiskRangeList toRead = planReadPartialDataStreams(streamList,
+ indexes, included, includedRowGroups, codec != null,
+ stripeFooter.getColumnsList(), types, bufferSize, true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
+ }
+ bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
+ }
+
+ createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+ }
+
+ /**
+ * Read the next stripe until we find a row that we don't skip.
+ *
+ * @throws IOException
+ */
+ private void advanceStripe() throws IOException {
+ rowInStripe = rowCountInStripe;
+ while (rowInStripe >= rowCountInStripe &&
+ currentStripe < stripes.size() - 1) {
+ currentStripe += 1;
+ readStripe();
+ }
+ }
+
+ /**
+ * Skip over rows that we aren't selecting, so that the next row is
+ * one that we will read.
+ *
+ * @param nextRow the row we want to go to
+ * @throws IOException
+ */
+ private boolean advanceToNextRow(
+ TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe)
+ throws IOException {
+ long nextRowInStripe = nextRow - rowBaseInStripe;
+ // check for row skipping
+ if (rowIndexStride != 0 &&
+ includedRowGroups != null &&
+ nextRowInStripe < rowCountInStripe) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ if (!includedRowGroups[rowGroup]) {
+ while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
+ rowGroup += 1;
+ }
+ if (rowGroup >= includedRowGroups.length) {
+ if (canAdvanceStripe) {
+ advanceStripe();
+ }
+ return canAdvanceStripe;
+ }
+ nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
+ }
+ }
+ if (nextRowInStripe >= rowCountInStripe) {
+ if (canAdvanceStripe) {
+ advanceStripe();
+ }
+ return canAdvanceStripe;
+ }
+ if (nextRowInStripe != rowInStripe) {
+ if (rowIndexStride != 0) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ seekToRowEntry(reader, rowGroup);
+ reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+ } else {
+ reader.skipRows(nextRowInStripe - rowInStripe);
+ }
+ rowInStripe = nextRowInStripe;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+ try {
+ if (rowInStripe >= rowCountInStripe) {
+ currentStripe += 1;
+ if (currentStripe >= stripes.size()) {
+ batch.size = 0;
+ return false;
+ }
+ readStripe();
+ }
+
+ int batchSize = computeBatchSize(batch.getMaxSize());
+
+ rowInStripe += batchSize;
+ reader.setVectorColumnCount(batch.getDataColumnCount());
+ reader.nextBatch(batch, batchSize);
+ batch.selectedInUse = false;
+ batch.size = batchSize;
+ advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+ return batch.size != 0;
+ } catch (IOException e) {
+ // Rethrow exception with file name in log message
+ throw new IOException("Error reading file: " + path, e);
+ }
+ }
+
+ private int computeBatchSize(long targetBatchSize) {
+ final int batchSize;
+ // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
+ // groups are selected then marker position is set to the end of range (subset of row groups
+ // within strip). Batch size computed out of marker position makes sure that batch size is
+ // aware of row group boundary and will not cause overflow when reading rows
+ // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
+ if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
+ int startRowGroup = (int) (rowInStripe / rowIndexStride);
+ if (!includedRowGroups[startRowGroup]) {
+ while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
+ startRowGroup += 1;
+ }
+ }
+
+ int endRowGroup = startRowGroup;
+ while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
+ endRowGroup += 1;
+ }
+
+ final long markerPosition =
+ (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
+ : rowCountInStripe;
+ batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
+
+ if (isLogDebugEnabled && batchSize < targetBatchSize) {
+ LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
+ }
+ } else {
+ batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
+ }
+ return batchSize;
+ }
+
+ @Override
+ public void close() throws IOException {
+ clearStreams();
+ dataReader.close();
+ }
+
+ @Override
+ public long getRowNumber() {
+ return rowInStripe + rowBaseInStripe + firstRow;
+ }
+
+ /**
+ * Return the fraction of rows that have been read from the selected.
+ * section of the file
+ *
+ * @return fraction between 0.0 and 1.0 of rows consumed
+ */
+ @Override
+ public float getProgress() {
+ return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+ }
+
+ private int findStripe(long rowNumber) {
+ for (int i = 0; i < stripes.size(); i++) {
+ StripeInformation stripe = stripes.get(i);
+ if (stripe.getNumberOfRows() > rowNumber) {
+ return i;
+ }
+ rowNumber -= stripe.getNumberOfRows();
+ }
+ throw new IllegalArgumentException("Seek after the end of reader range");
+ }
+
+ public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
+ boolean[] sargColumns) throws IOException {
+ return readRowIndex(stripeIndex, included, null, null, sargColumns);
+ }
+
+ public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
+ OrcProto.RowIndex[] indexes,
+ OrcProto.BloomFilterIndex[] bloomFilterIndex,
+ boolean[] sargColumns) throws IOException {
+ StripeInformation stripe = stripes.get(stripeIndex);
+ OrcProto.StripeFooter stripeFooter = null;
+ // if this is the current stripe, use the cached objects.
+ if (stripeIndex == currentStripe) {
+ stripeFooter = this.stripeFooter;
+ indexes = indexes == null ? this.indexes : indexes;
+ bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
+ sargColumns = sargColumns == null ?
+ (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
+ }
+ return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
+ bloomFilterIndex);
+ }
+
+ private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
+ throws IOException {
+ PositionProvider[] index = new PositionProvider[indexes.length];
+ for (int i = 0; i < indexes.length; ++i) {
+ if (indexes[i] != null) {
+ index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+ }
+ }
+ reader.seek(index);
+ }
+
+ @Override
+ public void seekToRow(long rowNumber) throws IOException {
+ if (rowNumber < 0) {
+ throw new IllegalArgumentException("Seek to a negative row number " +
+ rowNumber);
+ } else if (rowNumber < firstRow) {
+ throw new IllegalArgumentException("Seek before reader range " +
+ rowNumber);
+ }
+ // convert to our internal form (rows from the beginning of slice)
+ rowNumber -= firstRow;
+
+ // move to the right stripe
+ int rightStripe = findStripe(rowNumber);
+ if (rightStripe != currentStripe) {
+ currentStripe = rightStripe;
+ readStripe();
+ }
+ readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
+
+ // if we aren't to the right row yet, advance in the stripe.
+ advanceToNextRow(reader, rowNumber, true);
+ }
+
+ private static final String TRANSLATED_SARG_SEPARATOR = "_";
+ public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
+ return rootColumn + TRANSLATED_SARG_SEPARATOR
+ + ((indexInSourceTable == null) ? -1 : indexInSourceTable);
+ }
+
+ public static int[] mapTranslatedSargColumns(
+ List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
+ int[] result = new int[sargLeaves.size()];
+ OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
+ String lastRootStr = null;
+ for (int i = 0; i < result.length; ++i) {
+ String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
+ assert rootAndIndex.length == 2;
+ String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
+ int index = Integer.parseInt(indexStr);
+ // First, check if the column even maps to anything.
+ if (index == -1) {
+ result[i] = -1;
+ continue;
+ }
+ assert index >= 0;
+ // Then, find the root type if needed.
+ if (!rootStr.equals(lastRootStr)) {
+ lastRoot = types.get(Integer.parseInt(rootStr));
+ lastRootStr = rootStr;
+ }
+ // Subtypes of the root types correspond, in order, to the columns in the table schema
+ // (disregarding schema evolution that doesn't presently work). Get the index for the
+ // corresponding subtype.
+ result[i] = lastRoot.getSubtypes(index);
+ }
+ return result;
+ }
+}