You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/29 03:24:59 UTC
hive git commit: HIVE-11595 : refactor ORC footer reading to make it
usable from outside (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 97bf32a12 -> 22fa9216d
HIVE-11595 : refactor ORC footer reading to make it usable from outside (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22fa9216
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22fa9216
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22fa9216
Branch: refs/heads/master
Commit: 22fa9216d4e32d7681d3c1be8cbedc8c7999e56d
Parents: 97bf32a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Aug 28 18:23:05 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Aug 28 18:23:05 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/io/orc/Reader.java | 6 +
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 281 +++++++++++++------
2 files changed, 204 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 7bddefc..187924d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -358,4 +359,9 @@ public interface Reader {
String[] neededColumns) throws IOException;
MetadataReader metadata() throws IOException;
+
+ /** Gets serialized file metadata read from disk for the purposes of caching, etc. */
+ ByteBuffer getSerializedFileFooter();
+
+ Footer getFooter();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index c990d85..ab539c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.ql.io.FileFormatException;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
@@ -74,6 +76,9 @@ public class ReaderImpl implements Reader {
// will help avoid cpu cycles spend in deserializing at cost of increased
// memory footprint.
private final ByteBuffer footerByteBuffer;
+ // Same for metastore cache - maintains the same background buffer, but includes postscript.
+ // This will only be set if the file footer/metadata was read from disk.
+ private final ByteBuffer footerMetaAndPsBuffer;
static class StripeInformationImpl
implements StripeInformation {
@@ -166,11 +171,7 @@ public class ReaderImpl implements Reader {
@Override
public List<StripeInformation> getStripes() {
- List<StripeInformation> result = new ArrayList<StripeInformation>();
- for(OrcProto.StripeInformation info: footer.getStripesList()) {
- result.add(new StripeInformationImpl(info));
- }
- return result;
+ return convertProtoStripesToStripes(footer.getStripesList());
}
@Override
@@ -274,7 +275,7 @@ public class ReaderImpl implements Reader {
* Check to see if this ORC file is from a future version and if so,
* warn the user that we may not be able to read all of the column encodings.
* @param log the logger to write any error message to
- * @param path the filename for error messages
+ * @param path the data source path for error messages
* @param version the version of hive that wrote the file.
*/
static void checkOrcVersion(Log log, Path path, List<Integer> version) {
@@ -287,8 +288,7 @@ public class ReaderImpl implements Reader {
if (major > OrcFile.Version.CURRENT.getMajor() ||
(major == OrcFile.Version.CURRENT.getMajor() &&
minor > OrcFile.Version.CURRENT.getMinor())) {
- log.warn("ORC file " + path +
- " was written by a future Hive version " +
+ log.warn(path + " was written by a future Hive version " +
versionString(version) +
". This file may not be readable by this version of Hive.");
}
@@ -313,9 +313,11 @@ public class ReaderImpl implements Reader {
FileMetaInfo footerMetaData;
if (options.getFileMetaInfo() != null) {
footerMetaData = options.getFileMetaInfo();
+ this.footerMetaAndPsBuffer = null;
} else {
footerMetaData = extractMetaInfoFromFooter(fs, path,
options.getMaxLength());
+ this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
}
MetaInfoObjExtractor rInfo =
new MetaInfoObjExtractor(footerMetaData.compressionType,
@@ -349,6 +351,111 @@ public class ReaderImpl implements Reader {
return OrcFile.WriterVersion.ORIGINAL;
}
+ /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */
+ public static FooterInfo extractMetaInfoFromFooter(
+ ByteBuffer bb, Path srcPath) throws IOException {
+ // Read the PostScript. Be very careful as some parts of this historically use bb position
+ // and some use absolute offsets that have to take position into account.
+ int baseOffset = bb.position();
+ int lastByteAbsPos = baseOffset + bb.remaining() - 1;
+ int psLen = bb.get(lastByteAbsPos) & 0xff;
+ int psAbsPos = lastByteAbsPos - psLen;
+ OrcProto.PostScript ps = extractPostScript(bb, srcPath, psLen, psAbsPos);
+ assert baseOffset == bb.position();
+
+ // Extract PS information.
+ int footerSize = (int)ps.getFooterLength(), metadataSize = (int)ps.getMetadataLength(),
+ footerAbsPos = psAbsPos - footerSize, metadataAbsPos = footerAbsPos - metadataSize;
+ String compressionType = ps.getCompression().toString();
+ CompressionCodec codec = WriterImpl.createCodec(CompressionKind.valueOf(compressionType));
+ int bufferSize = (int)ps.getCompressionBlockSize();
+ bb.position(metadataAbsPos);
+ bb.mark();
+
+ // Extract metadata and footer.
+ Metadata metadata = new Metadata(extractMetadata(
+ bb, metadataAbsPos, metadataSize, codec, bufferSize));
+ OrcProto.Footer footer = extractFooter(bb, footerAbsPos, footerSize, codec, bufferSize);
+ bb.position(metadataAbsPos);
+ bb.limit(psAbsPos);
+ // TODO: do we need footer buffer here? FileInfo/FileMetaInfo is a mess...
+ FileMetaInfo fmi = new FileMetaInfo(
+ compressionType, bufferSize, metadataSize, bb, extractWriterVersion(ps));
+ return new FooterInfo(metadata, footer, fmi);
+ }
+
+ private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+ int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(footerAbsPos);
+ bb.limit(footerAbsPos + footerSize);
+ InputStream instream = InStream.create("footer", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), footerSize, codec, bufferSize);
+ return OrcProto.Footer.parseFrom(instream);
+ }
+
+ private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+ int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(metadataAbsPos);
+ bb.limit(metadataAbsPos + metadataSize);
+ InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+ CodedInputStream in = CodedInputStream.newInstance(instream);
+ int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT;
+ OrcProto.Metadata meta = null;
+ do {
+ try {
+ in.setSizeLimit(msgLimit);
+ meta = OrcProto.Metadata.parseFrom(in);
+ } catch (InvalidProtocolBufferException e) {
+ if (e.getMessage().contains("Protocol message was too large")) {
+ LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" +
+ " size of the coded input stream." );
+
+ msgLimit = msgLimit << 1;
+ if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) {
+ LOG.error("Metadata section exceeds max protobuf message size of " +
+ PROTOBUF_MESSAGE_MAX_LIMIT + " bytes.");
+ throw e;
+ }
+
+ // we must have failed in the middle of reading instream and instream doesn't support
+ // resetting the stream
+ instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+ in = CodedInputStream.newInstance(instream);
+ } else {
+ throw e;
+ }
+ }
+ } while (meta == null);
+ return meta;
+ }
+
+ private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+ int psLen, int psAbsOffset) throws IOException {
+ // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+ assert bb.hasArray();
+ CodedInputStream in = CodedInputStream.newInstance(
+ bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+ OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+ checkOrcVersion(LOG, path, ps.getVersionList());
+
+ // Check compression codec.
+ switch (ps.getCompression()) {
+ case NONE:
+ break;
+ case ZLIB:
+ break;
+ case SNAPPY:
+ break;
+ case LZO:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+ }
+ return ps;
+ }
+
private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
Path path,
long maxFileLength
@@ -367,44 +474,24 @@ public class ReaderImpl implements Reader {
int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
file.seek(size - readSize);
ByteBuffer buffer = ByteBuffer.allocate(readSize);
- file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
+ assert buffer.position() == 0;
+ file.readFully(buffer.array(), buffer.arrayOffset(), readSize);
+ buffer.position(0);
//read the PostScript
//get length of PostScript
int psLen = buffer.get(readSize - 1) & 0xff;
ensureOrcFooter(file, path, psLen, buffer);
int psOffset = readSize - 1 - psLen;
- CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
- buffer.arrayOffset() + psOffset, psLen);
- OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
-
- checkOrcVersion(LOG, path, ps.getVersionList());
+ OrcProto.PostScript ps = extractPostScript(buffer, path, psLen, psOffset);
int footerSize = (int) ps.getFooterLength();
int metadataSize = (int) ps.getMetadataLength();
- OrcFile.WriterVersion writerVersion;
- if (ps.hasWriterVersion()) {
- writerVersion = getWriterVersion(ps.getWriterVersion());
- } else {
- writerVersion = OrcFile.WriterVersion.ORIGINAL;
- }
+ OrcFile.WriterVersion writerVersion = extractWriterVersion(ps);
- //check compression codec
- switch (ps.getCompression()) {
- case NONE:
- break;
- case ZLIB:
- break;
- case SNAPPY:
- break;
- case LZO:
- break;
- default:
- throw new IllegalArgumentException("Unknown compression");
- }
//check if extra bytes need to be read
+ ByteBuffer fullFooterBuffer = null;
int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
if (extra > 0) {
//more bytes need to be read, seek back to the right place and read extra bytes
@@ -417,10 +504,12 @@ public class ReaderImpl implements Reader {
extraBuf.put(buffer);
buffer = extraBuf;
buffer.position(0);
+ fullFooterBuffer = buffer.slice();
buffer.limit(footerSize + metadataSize);
} else {
//footer is already in the bytes in buffer, just adjust position, length
buffer.position(psOffset - footerSize - metadataSize);
+ fullFooterBuffer = buffer.slice();
buffer.limit(psOffset);
}
@@ -435,11 +524,24 @@ public class ReaderImpl implements Reader {
(int) ps.getMetadataLength(),
buffer,
ps.getVersionList(),
- writerVersion
+ writerVersion,
+ fullFooterBuffer
);
}
+ private static OrcFile.WriterVersion extractWriterVersion(OrcProto.PostScript ps) {
+ return (ps.hasWriterVersion()
+ ? getWriterVersion(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL);
+ }
+ private static List<StripeInformation> convertProtoStripesToStripes(
+ List<OrcProto.StripeInformation> stripes) {
+ List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size());
+ for (OrcProto.StripeInformation info : stripes) {
+ result.add(new StripeInformationImpl(info));
+ }
+ return result;
+ }
/**
* MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl
@@ -467,46 +569,10 @@ public class ReaderImpl implements Reader {
int position = footerBuffer.position();
int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
- footerBuffer.limit(position + metadataSize);
-
- InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize);
- CodedInputStream in = CodedInputStream.newInstance(instream);
- int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT;
- OrcProto.Metadata meta = null;
- do {
- try {
- in.setSizeLimit(msgLimit);
- meta = OrcProto.Metadata.parseFrom(in);
- } catch (InvalidProtocolBufferException e) {
- if (e.getMessage().contains("Protocol message was too large")) {
- LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" +
- " size of the coded input stream." );
-
- msgLimit = msgLimit << 1;
- if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) {
- LOG.error("Metadata section exceeds max protobuf message size of " +
- PROTOBUF_MESSAGE_MAX_LIMIT + " bytes.");
- throw e;
- }
-
- // we must have failed in the middle of reading instream and instream doesn't support
- // resetting the stream
- instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize);
- in = CodedInputStream.newInstance(instream);
- } else {
- throw e;
- }
- }
- } while (meta == null);
- this.metadata = meta;
- footerBuffer.position(position + metadataSize);
- footerBuffer.limit(position + metadataSize + footerBufferSize);
- instream = InStream.create("footer", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize);
- this.footer = OrcProto.Footer.parseFrom(instream);
+ this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize);
+ this.footer = extractFooter(
+ footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize);
footerBuffer.position(position);
this.inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
@@ -518,7 +584,8 @@ public class ReaderImpl implements Reader {
* that is useful for Reader implementation
*
*/
- static class FileMetaInfo{
+ static class FileMetaInfo {
+ private ByteBuffer footerMetaAndPsBuffer;
final String compressionType;
final int bufferSize;
final int metadataSize;
@@ -526,30 +593,68 @@ public class ReaderImpl implements Reader {
final List<Integer> versionList;
final OrcFile.WriterVersion writerVersion;
+ /** Ctor used when reading splits - no version list or full footer buffer. */
FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) {
this(compressionType, bufferSize, metadataSize, footerBuffer, null,
- writerVersion);
+ writerVersion, null);
}
- FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer, List<Integer> versionList,
- OrcFile.WriterVersion writerVersion){
+ /** Ctor used when creating file info during init and when getting a new one. */
+ public FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
+ ByteBuffer footerBuffer, List<Integer> versionList, WriterVersion writerVersion,
+ ByteBuffer fullFooterBuffer) {
this.compressionType = compressionType;
this.bufferSize = bufferSize;
this.metadataSize = metadataSize;
this.footerBuffer = footerBuffer;
this.versionList = versionList;
this.writerVersion = writerVersion;
+ this.footerMetaAndPsBuffer = fullFooterBuffer;
}
}
- public FileMetaInfo getFileMetaInfo(){
+ public FileMetaInfo getFileMetaInfo() {
return new FileMetaInfo(compressionKind.toString(), bufferSize,
- metadataSize, footerByteBuffer, versionList, writerVersion);
+ metadataSize, footerByteBuffer, versionList, writerVersion, footerMetaAndPsBuffer);
}
+ /** Same as FileMetaInfo, but with extra fields. FileMetaInfo is serialized for splits
+ * and so we don't just add fields to it, it's already messy and confusing. */
+ public static final class FooterInfo {
+ private final OrcProto.Footer footer;
+ private final Metadata metadata;
+ private final List<StripeInformation> stripes;
+ private final FileMetaInfo fileMetaInfo;
+ private FooterInfo(Metadata metadata, OrcProto.Footer footer, FileMetaInfo fileMetaInfo) {
+ this.metadata = metadata;
+ this.footer = footer;
+ this.fileMetaInfo = fileMetaInfo;
+ this.stripes = convertProtoStripesToStripes(footer.getStripesList());
+ }
+
+ public OrcProto.Footer getFooter() {
+ return footer;
+ }
+
+ public Metadata getMetadata() {
+ return metadata;
+ }
+
+ public FileMetaInfo getFileMetaInfo() {
+ return fileMetaInfo;
+ }
+
+ public List<StripeInformation> getStripes() {
+ return stripes;
+ }
+ }
+
+ @Override
+ public ByteBuffer getSerializedFileFooter() {
+ return footerMetaAndPsBuffer;
+ }
@Override
public RecordReader rows() throws IOException {
@@ -609,14 +714,19 @@ public class ReaderImpl implements Reader {
@Override
public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+ return getRawDataSizeFromColIndices(colIndices, footer);
+ }
+
+ public static long getRawDataSizeFromColIndices(
+ List<Integer> colIndices, OrcProto.Footer footer) {
long result = 0;
for (int colIdx : colIndices) {
- result += getRawDataSizeOfColumn(colIdx);
+ result += getRawDataSizeOfColumn(colIdx, footer);
}
return result;
}
- private long getRawDataSizeOfColumn(int colIdx) {
+ private static long getRawDataSizeOfColumn(int colIdx, OrcProto.Footer footer) {
OrcProto.ColumnStatistics colStat = footer.getStatistics(colIdx);
long numVals = colStat.getNumberOfValues();
Type type = footer.getTypes(colIdx);
@@ -738,4 +848,9 @@ public class ReaderImpl implements Reader {
public MetadataReader metadata() throws IOException {
return new MetadataReader(fileSystem, path, codec, bufferSize, footer.getTypesCount());
}
+
+ @Override
+ public Footer getFooter() {
+ return footer;
+ }
}