You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/09/09 09:08:47 UTC

[17/50] [abbrv] hive git commit: HIVE-11595 : refactor ORC footer reading to make it usable from outside (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-11595 : refactor ORC footer reading to make it usable from outside (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22fa9216
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22fa9216
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22fa9216

Branch: refs/heads/beeline-cli
Commit: 22fa9216d4e32d7681d3c1be8cbedc8c7999e56d
Parents: 97bf32a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Aug 28 18:23:05 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Aug 28 18:23:05 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/io/orc/Reader.java    |   6 +
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       | 281 +++++++++++++------
 2 files changed, 204 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 7bddefc..187924d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
@@ -358,4 +359,9 @@ public interface Reader {
                     String[] neededColumns) throws IOException;
 
   MetadataReader metadata() throws IOException;
+
+  /** Gets serialized file metadata read from disk for the purposes of caching, etc. */
+  ByteBuffer getSerializedFileFooter();
+
+  Footer getFooter();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index c990d85..ab539c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.ql.io.FileFormatException;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
@@ -74,6 +76,9 @@ public class ReaderImpl implements Reader {
   // will help avoid cpu cycles spend in deserializing at cost of increased
   // memory footprint.
   private final ByteBuffer footerByteBuffer;
+  // Same for metastore cache - maintains the same background buffer, but includes postscript.
+  // This will only be set if the file footer/metadata was read from disk.
+  private final ByteBuffer footerMetaAndPsBuffer;
 
   static class StripeInformationImpl
       implements StripeInformation {
@@ -166,11 +171,7 @@ public class ReaderImpl implements Reader {
 
   @Override
   public List<StripeInformation> getStripes() {
-    List<StripeInformation> result = new ArrayList<StripeInformation>();
-    for(OrcProto.StripeInformation info: footer.getStripesList()) {
-      result.add(new StripeInformationImpl(info));
-    }
-    return result;
+    return convertProtoStripesToStripes(footer.getStripesList());
   }
 
   @Override
@@ -274,7 +275,7 @@ public class ReaderImpl implements Reader {
    * Check to see if this ORC file is from a future version and if so,
    * warn the user that we may not be able to read all of the column encodings.
    * @param log the logger to write any error message to
-   * @param path the filename for error messages
+   * @param path the data source path for error messages
    * @param version the version of hive that wrote the file.
    */
   static void checkOrcVersion(Log log, Path path, List<Integer> version) {
@@ -287,8 +288,7 @@ public class ReaderImpl implements Reader {
       if (major > OrcFile.Version.CURRENT.getMajor() ||
           (major == OrcFile.Version.CURRENT.getMajor() &&
            minor > OrcFile.Version.CURRENT.getMinor())) {
-        log.warn("ORC file " + path +
-                 " was written by a future Hive version " +
+        log.warn(path + " was written by a future Hive version " +
                  versionString(version) +
                  ". This file may not be readable by this version of Hive.");
       }
@@ -313,9 +313,11 @@ public class ReaderImpl implements Reader {
     FileMetaInfo footerMetaData;
     if (options.getFileMetaInfo() != null) {
       footerMetaData = options.getFileMetaInfo();
+      this.footerMetaAndPsBuffer = null;
     } else {
       footerMetaData = extractMetaInfoFromFooter(fs, path,
           options.getMaxLength());
+      this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
     }
     MetaInfoObjExtractor rInfo =
         new MetaInfoObjExtractor(footerMetaData.compressionType,
@@ -349,6 +351,111 @@ public class ReaderImpl implements Reader {
     return OrcFile.WriterVersion.ORIGINAL;
   }
 
+  /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */
+  public static FooterInfo extractMetaInfoFromFooter(
+      ByteBuffer bb, Path srcPath) throws IOException {
+    // Read the PostScript. Be very careful as some parts of this historically use bb position
+    // and some use absolute offsets that have to take position into account.
+    int baseOffset = bb.position();
+    int lastByteAbsPos = baseOffset + bb.remaining() - 1;
+    int psLen = bb.get(lastByteAbsPos) & 0xff;
+    int psAbsPos = lastByteAbsPos - psLen;
+    OrcProto.PostScript ps = extractPostScript(bb, srcPath, psLen, psAbsPos);
+    assert baseOffset == bb.position();
+
+    // Extract PS information.
+    int footerSize = (int)ps.getFooterLength(), metadataSize = (int)ps.getMetadataLength(),
+        footerAbsPos = psAbsPos - footerSize, metadataAbsPos = footerAbsPos - metadataSize;
+    String compressionType = ps.getCompression().toString();
+    CompressionCodec codec = WriterImpl.createCodec(CompressionKind.valueOf(compressionType));
+    int bufferSize = (int)ps.getCompressionBlockSize();
+    bb.position(metadataAbsPos);
+    bb.mark();
+
+    // Extract metadata and footer.
+    Metadata metadata = new Metadata(extractMetadata(
+        bb, metadataAbsPos, metadataSize, codec, bufferSize));
+    OrcProto.Footer footer = extractFooter(bb, footerAbsPos, footerSize, codec, bufferSize);
+    bb.position(metadataAbsPos);
+    bb.limit(psAbsPos);
+    // TODO: do we need footer buffer here? FileInfo/FileMetaInfo is a mess...
+    FileMetaInfo fmi = new FileMetaInfo(
+        compressionType, bufferSize, metadataSize, bb, extractWriterVersion(ps));
+    return new FooterInfo(metadata, footer, fmi);
+  }
+
+  private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+      int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+    bb.position(footerAbsPos);
+    bb.limit(footerAbsPos + footerSize);
+    InputStream instream = InStream.create("footer", Lists.<DiskRange>newArrayList(
+          new BufferChunk(bb, 0)), footerSize, codec, bufferSize);
+    return OrcProto.Footer.parseFrom(instream);
+  }
+
+  private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+      int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+    bb.position(metadataAbsPos);
+    bb.limit(metadataAbsPos + metadataSize);
+    InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
+        new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+    CodedInputStream in = CodedInputStream.newInstance(instream);
+    int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT;
+    OrcProto.Metadata meta = null;
+    do {
+      try {
+        in.setSizeLimit(msgLimit);
+        meta = OrcProto.Metadata.parseFrom(in);
+      } catch (InvalidProtocolBufferException e) {
+        if (e.getMessage().contains("Protocol message was too large")) {
+          LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" +
+              " size of the coded input stream." );
+
+          msgLimit = msgLimit << 1;
+          if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) {
+            LOG.error("Metadata section exceeds max protobuf message size of " +
+                PROTOBUF_MESSAGE_MAX_LIMIT + " bytes.");
+            throw e;
+          }
+
+          // we must have failed in the middle of reading instream and instream doesn't support
+          // resetting the stream
+          instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
+              new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+          in = CodedInputStream.newInstance(instream);
+        } else {
+          throw e;
+        }
+      }
+    } while (meta == null);
+    return meta;
+  }
+
+  private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+      int psLen, int psAbsOffset) throws IOException {
+    // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+    assert bb.hasArray();
+    CodedInputStream in = CodedInputStream.newInstance(
+        bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+    OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+    checkOrcVersion(LOG, path, ps.getVersionList());
+
+    // Check compression codec.
+    switch (ps.getCompression()) {
+      case NONE:
+        break;
+      case ZLIB:
+        break;
+      case SNAPPY:
+        break;
+      case LZO:
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown compression");
+    }
+    return ps;
+  }
+
   private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
                                                         Path path,
                                                         long maxFileLength
@@ -367,44 +474,24 @@ public class ReaderImpl implements Reader {
     int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
     file.seek(size - readSize);
     ByteBuffer buffer = ByteBuffer.allocate(readSize);
-    file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
-      buffer.remaining());
+    assert buffer.position() == 0;
+    file.readFully(buffer.array(), buffer.arrayOffset(), readSize);
+    buffer.position(0);
 
     //read the PostScript
     //get length of PostScript
     int psLen = buffer.get(readSize - 1) & 0xff;
     ensureOrcFooter(file, path, psLen, buffer);
     int psOffset = readSize - 1 - psLen;
-    CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
-      buffer.arrayOffset() + psOffset, psLen);
-    OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
-
-    checkOrcVersion(LOG, path, ps.getVersionList());
+    OrcProto.PostScript ps = extractPostScript(buffer, path, psLen, psOffset);
 
     int footerSize = (int) ps.getFooterLength();
     int metadataSize = (int) ps.getMetadataLength();
-    OrcFile.WriterVersion writerVersion;
-    if (ps.hasWriterVersion()) {
-      writerVersion =  getWriterVersion(ps.getWriterVersion());
-    } else {
-      writerVersion = OrcFile.WriterVersion.ORIGINAL;
-    }
+    OrcFile.WriterVersion writerVersion = extractWriterVersion(ps);
 
-    //check compression codec
-    switch (ps.getCompression()) {
-      case NONE:
-        break;
-      case ZLIB:
-        break;
-      case SNAPPY:
-        break;
-      case LZO:
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown compression");
-    }
 
     //check if extra bytes need to be read
+    ByteBuffer fullFooterBuffer = null;
     int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
     if (extra > 0) {
       //more bytes need to be read, seek back to the right place and read extra bytes
@@ -417,10 +504,12 @@ public class ReaderImpl implements Reader {
       extraBuf.put(buffer);
       buffer = extraBuf;
       buffer.position(0);
+      fullFooterBuffer = buffer.slice();
       buffer.limit(footerSize + metadataSize);
     } else {
       //footer is already in the bytes in buffer, just adjust position, length
       buffer.position(psOffset - footerSize - metadataSize);
+      fullFooterBuffer = buffer.slice();
       buffer.limit(psOffset);
     }
 
@@ -435,11 +524,24 @@ public class ReaderImpl implements Reader {
         (int) ps.getMetadataLength(),
         buffer,
         ps.getVersionList(),
-        writerVersion
+        writerVersion,
+        fullFooterBuffer
         );
   }
 
+  private static OrcFile.WriterVersion extractWriterVersion(OrcProto.PostScript ps) {
+    return (ps.hasWriterVersion()
+        ? getWriterVersion(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL);
+  }
 
+  private static List<StripeInformation> convertProtoStripesToStripes(
+      List<OrcProto.StripeInformation> stripes) {
+    List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size());
+    for (OrcProto.StripeInformation info : stripes) {
+      result.add(new StripeInformationImpl(info));
+    }
+    return result;
+  }
 
   /**
    * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl
@@ -467,46 +569,10 @@ public class ReaderImpl implements Reader {
 
       int position = footerBuffer.position();
       int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
-      footerBuffer.limit(position + metadataSize);
-
-      InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
-          new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize);
-      CodedInputStream in = CodedInputStream.newInstance(instream);
-      int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT;
-      OrcProto.Metadata meta = null;
-      do {
-        try {
-          in.setSizeLimit(msgLimit);
-          meta = OrcProto.Metadata.parseFrom(in);
-        } catch (InvalidProtocolBufferException e) {
-          if (e.getMessage().contains("Protocol message was too large")) {
-            LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" +
-                " size of the coded input stream." );
-
-            msgLimit = msgLimit << 1;
-            if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) {
-              LOG.error("Metadata section exceeds max protobuf message size of " +
-                  PROTOBUF_MESSAGE_MAX_LIMIT + " bytes.");
-              throw e;
-            }
-
-            // we must have failed in the middle of reading instream and instream doesn't support
-            // resetting the stream
-            instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
-                new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize);
-            in = CodedInputStream.newInstance(instream);
-          } else {
-            throw e;
-          }
-        }
-      } while (meta == null);
-      this.metadata = meta;
 
-      footerBuffer.position(position + metadataSize);
-      footerBuffer.limit(position + metadataSize + footerBufferSize);
-      instream = InStream.create("footer", Lists.<DiskRange>newArrayList(
-          new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize);
-      this.footer = OrcProto.Footer.parseFrom(instream);
+      this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize);
+      this.footer = extractFooter(
+          footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize);
 
       footerBuffer.position(position);
       this.inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
@@ -518,7 +584,8 @@ public class ReaderImpl implements Reader {
    * that is useful for Reader implementation
    *
    */
-  static class FileMetaInfo{
+  static class FileMetaInfo {
+    private ByteBuffer footerMetaAndPsBuffer;
     final String compressionType;
     final int bufferSize;
     final int metadataSize;
@@ -526,30 +593,68 @@ public class ReaderImpl implements Reader {
     final List<Integer> versionList;
     final OrcFile.WriterVersion writerVersion;
 
+    /** Ctor used when reading splits - no version list or full footer buffer. */
     FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
         ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) {
       this(compressionType, bufferSize, metadataSize, footerBuffer, null,
-          writerVersion);
+          writerVersion, null);
     }
 
-    FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
-                 ByteBuffer footerBuffer, List<Integer> versionList,
-                 OrcFile.WriterVersion writerVersion){
+    /** Ctor used when creating file info during init and when getting a new one. */
+    public FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
+        ByteBuffer footerBuffer, List<Integer> versionList, WriterVersion writerVersion,
+        ByteBuffer fullFooterBuffer) {
       this.compressionType = compressionType;
       this.bufferSize = bufferSize;
       this.metadataSize = metadataSize;
       this.footerBuffer = footerBuffer;
       this.versionList = versionList;
       this.writerVersion = writerVersion;
+      this.footerMetaAndPsBuffer = fullFooterBuffer;
     }
   }
 
-  public FileMetaInfo getFileMetaInfo(){
+  public FileMetaInfo getFileMetaInfo() {
     return new FileMetaInfo(compressionKind.toString(), bufferSize,
-        metadataSize, footerByteBuffer, versionList, writerVersion);
+        metadataSize, footerByteBuffer, versionList, writerVersion, footerMetaAndPsBuffer);
   }
 
+  /** Same as FileMetaInfo, but with extra fields. FileMetaInfo is serialized for splits
+   * and so we don't just add fields to it, it's already messy and confusing. */
+  public static final class FooterInfo {
+    private final OrcProto.Footer footer;
+    private final Metadata metadata;
+    private final List<StripeInformation> stripes;
+    private final FileMetaInfo fileMetaInfo;
 
+    private FooterInfo(Metadata metadata, OrcProto.Footer footer, FileMetaInfo fileMetaInfo) {
+      this.metadata = metadata;
+      this.footer = footer;
+      this.fileMetaInfo = fileMetaInfo;
+      this.stripes = convertProtoStripesToStripes(footer.getStripesList());
+    }
+
+    public OrcProto.Footer getFooter() {
+      return footer;
+    }
+
+    public Metadata getMetadata() {
+      return metadata;
+    }
+
+    public FileMetaInfo getFileMetaInfo() {
+      return fileMetaInfo;
+    }
+
+    public List<StripeInformation> getStripes() {
+      return stripes;
+    }
+  }
+
+  @Override
+  public ByteBuffer getSerializedFileFooter() {
+    return footerMetaAndPsBuffer;
+  }
 
   @Override
   public RecordReader rows() throws IOException {
@@ -609,14 +714,19 @@ public class ReaderImpl implements Reader {
 
   @Override
   public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+    return getRawDataSizeFromColIndices(colIndices, footer);
+  }
+
+  public static long getRawDataSizeFromColIndices(
+      List<Integer> colIndices, OrcProto.Footer footer) {
     long result = 0;
     for (int colIdx : colIndices) {
-      result += getRawDataSizeOfColumn(colIdx);
+      result += getRawDataSizeOfColumn(colIdx, footer);
     }
     return result;
   }
 
-  private long getRawDataSizeOfColumn(int colIdx) {
+  private static long getRawDataSizeOfColumn(int colIdx, OrcProto.Footer footer) {
     OrcProto.ColumnStatistics colStat = footer.getStatistics(colIdx);
     long numVals = colStat.getNumberOfValues();
     Type type = footer.getTypes(colIdx);
@@ -738,4 +848,9 @@ public class ReaderImpl implements Reader {
   public MetadataReader metadata() throws IOException {
     return new MetadataReader(fileSystem, path, codec, bufferSize, footer.getTypesCount());
   }
+
+  @Override
+  public Footer getFooter() {
+    return footer;
+  }
 }