You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2017/07/19 16:58:53 UTC

[30/37] hive git commit: HIVE-17118. Move the hive-orc source files to make the package names unique.

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java b/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java
new file mode 100644
index 0000000..47c33bb
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.orc.CompressionCodec;
+import org.apache.hive.orc.CompressionCodec.Modifier;
+import org.apache.hive.orc.CompressionKind;
+import org.apache.hive.orc.OrcFile;
+import org.apache.hive.orc.OrcFile.CompressionStrategy;
+import org.apache.hive.orc.OrcProto;
+import org.apache.hive.orc.OrcProto.BloomFilterIndex;
+import org.apache.hive.orc.OrcProto.Footer;
+import org.apache.hive.orc.OrcProto.Metadata;
+import org.apache.hive.orc.OrcProto.PostScript;
+import org.apache.hive.orc.OrcProto.Stream.Kind;
+import org.apache.hive.orc.OrcProto.StripeFooter;
+import org.apache.hive.orc.OrcProto.StripeInformation;
+import org.apache.hive.orc.OrcProto.RowIndex.Builder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedOutputStream;
+
+public class PhysicalFsWriter implements PhysicalWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
+
+  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+  private FSDataOutputStream rawWriter = null;
+  // the compressed metadata information outStream
+  private OutStream writer = null;
+  // a protobuf outStream around streamFactory
+  private CodedOutputStream protobufWriter = null;
+
+  private final FileSystem fs;
+  private final Path path;
+  private final long blockSize;
+  private final int bufferSize;
+  private final CompressionCodec codec;
+  private final double paddingTolerance;
+  private final long defaultStripeSize;
+  private final CompressionKind compress;
+  private final boolean addBlockPadding;
+  private final CompressionStrategy compressionStrategy;
+
+  // the streams that make up the current stripe
+  private final Map<StreamName, BufferedStream> streams =
+    new TreeMap<StreamName, BufferedStream>();
+
+  private long adjustedStripeSize;
+  private long headerLength;
+  private long stripeStart;
+  private int metadataLength;
+  private int footerLength;
+
+  public PhysicalFsWriter(FileSystem fs, Path path, int numColumns, OrcFile.WriterOptions opts) {
+    this.fs = fs;
+    this.path = path;
+    this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize();
+    this.addBlockPadding = opts.getBlockPadding();
+    if (opts.isEnforceBufferSize()) {
+      this.bufferSize = opts.getBufferSize();
+    } else {
+      this.bufferSize = getEstimatedBufferSize(defaultStripeSize, numColumns, opts.getBufferSize());
+    }
+    this.compress = opts.getCompress();
+    this.compressionStrategy = opts.getCompressionStrategy();
+    codec = createCodec(compress);
+    this.paddingTolerance = opts.getPaddingTolerance();
+    this.blockSize = opts.getBlockSize();
+    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+        " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+        compress, bufferSize);
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    if (rawWriter != null) return;
+    rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+                          fs.getDefaultReplication(path), blockSize);
+    rawWriter.writeBytes(OrcFile.MAGIC);
+    headerLength = rawWriter.getPos();
+    writer = new OutStream("metadata", bufferSize, codec,
+                           new DirectStream(rawWriter));
+    protobufWriter = CodedOutputStream.newInstance(writer);
+  }
+
+  private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
+    this.stripeStart = rawWriter.getPos();
+    final long currentStripeSize = indexSize + dataSize + footerSize;
+    final long available = blockSize - (stripeStart % blockSize);
+    final long overflow = currentStripeSize - adjustedStripeSize;
+    final float availRatio = (float) available / (float) defaultStripeSize;
+
+    if (availRatio > 0.0f && availRatio < 1.0f
+        && availRatio > paddingTolerance) {
+      // adjust default stripe size to fit into remaining space, also adjust
+      // the next stripe for correction based on the current stripe size
+      // and user specified padding tolerance. Since stripe size can overflow
+      // the default stripe size we should apply this correction to avoid
+      // writing portion of last stripe to next hdfs block.
+      double correction = overflow > 0 ? (double) overflow
+          / (double) adjustedStripeSize : 0.0;
+
+      // correction should not be greater than user specified padding
+      // tolerance
+      correction = correction > paddingTolerance ? paddingTolerance
+          : correction;
+
+      // adjust next stripe size based on current stripe estimate correction
+      adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
+    } else if (availRatio >= 1.0) {
+      adjustedStripeSize = defaultStripeSize;
+    }
+
+    if (availRatio < paddingTolerance && addBlockPadding) {
+      long padding = blockSize - (stripeStart % blockSize);
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+      LOG.info(String.format("Padding ORC by %d bytes (<=  %.2f * %d)",
+          padding, availRatio, defaultStripeSize));
+      stripeStart += padding;
+      while (padding > 0) {
+        int writeLen = (int) Math.min(padding, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        padding -= writeLen;
+      }
+      adjustedStripeSize = defaultStripeSize;
+    } else if (currentStripeSize < blockSize
+        && (stripeStart % blockSize) + currentStripeSize > blockSize) {
+      // even if you don't pad, reset the default stripe size when crossing a
+      // block boundary
+      adjustedStripeSize = defaultStripeSize;
+    }
+  }
+
+  /**
+   * An output receiver that writes the ByteBuffers to the output stream
+   * as they are received.
+   */
+  private class DirectStream implements OutStream.OutputReceiver {
+    private final FSDataOutputStream output;
+
+    DirectStream(FSDataOutputStream output) {
+      this.output = output;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+    }
+  }
+
+  @Override
+  public long getPhysicalStripeSize() {
+    return adjustedStripeSize;
+  }
+
+  @Override
+  public boolean isCompressed() {
+    return codec != null;
+  }
+
+
+  public static CompressionCodec createCodec(CompressionKind kind) {
+    switch (kind) {
+      case NONE:
+        return null;
+      case ZLIB:
+        return new ZlibCodec();
+      case SNAPPY:
+        return new SnappyCodec();
+      case LZO:
+        try {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          if (loader == null) {
+            loader = WriterImpl.class.getClassLoader();
+          }
+          @SuppressWarnings("unchecked")
+          Class<? extends CompressionCodec> lzo =
+              (Class<? extends CompressionCodec>)
+              loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+          return lzo.newInstance();
+        } catch (ClassNotFoundException e) {
+          throw new IllegalArgumentException("LZO is not available.", e);
+        } catch (InstantiationException e) {
+          throw new IllegalArgumentException("Problem initializing LZO", e);
+        } catch (IllegalAccessException e) {
+          throw new IllegalArgumentException("Insufficient access to LZO", e);
+        }
+      default:
+        throw new IllegalArgumentException("Unknown compression codec: " +
+            kind);
+    }
+  }
+
+  private void writeStripeFooter(StripeFooter footer, long dataSize, long indexSize,
+      StripeInformation.Builder dirEntry) throws IOException {
+    footer.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    dirEntry.setOffset(stripeStart);
+    dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize);
+  }
+
+  @VisibleForTesting
+  public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+                                           int bs) {
+    // The worst case is that there are 2 big streams per a column and
+    // we want to guarantee that each stream gets ~10 buffers.
+    // This keeps buffers small enough that we don't get really small stripe
+    // sizes.
+    int estBufferSize = (int) (stripeSize / (20 * numColumns));
+    estBufferSize = getClosestBufferSize(estBufferSize);
+    return estBufferSize > bs ? bs : estBufferSize;
+  }
+
+  private static int getClosestBufferSize(int estBufferSize) {
+    final int kb4 = 4 * 1024;
+    final int kb8 = 8 * 1024;
+    final int kb16 = 16 * 1024;
+    final int kb32 = 32 * 1024;
+    final int kb64 = 64 * 1024;
+    final int kb128 = 128 * 1024;
+    final int kb256 = 256 * 1024;
+    if (estBufferSize <= kb4) {
+      return kb4;
+    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+      return kb8;
+    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+      return kb16;
+    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+      return kb32;
+    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+      return kb64;
+    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+      return kb128;
+    } else {
+      return kb256;
+    }
+  }
+
+  @Override
+  public void writeFileMetadata(Metadata.Builder builder) throws IOException {
+    long startPosn = rawWriter.getPos();
+    Metadata metadata = builder.build();
+    metadata.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    this.metadataLength = (int) (rawWriter.getPos() - startPosn);
+  }
+
+  @Override
+  public void writeFileFooter(Footer.Builder builder) throws IOException {
+    long bodyLength = rawWriter.getPos() - metadataLength;
+    builder.setContentLength(bodyLength);
+    builder.setHeaderLength(headerLength);
+    long startPosn = rawWriter.getPos();
+    Footer footer = builder.build();
+    footer.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    this.footerLength = (int) (rawWriter.getPos() - startPosn);
+  }
+
+  @Override
+  public void writePostScript(PostScript.Builder builder) throws IOException {
+    builder.setCompression(writeCompressionKind(compress));
+    builder.setFooterLength(footerLength);
+    builder.setMetadataLength(metadataLength);
+    if (compress != CompressionKind.NONE) {
+      builder.setCompressionBlockSize(bufferSize);
+    }
+    PostScript ps = builder.build();
+    // need to write this uncompressed
+    long startPosn = rawWriter.getPos();
+    ps.writeTo(rawWriter);
+    long length = rawWriter.getPos() - startPosn;
+    if (length > 255) {
+      throw new IllegalArgumentException("PostScript too large at " + length);
+    }
+    rawWriter.writeByte((int)length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    rawWriter.close();
+  }
+
+  private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+    switch (kind) {
+      case NONE: return OrcProto.CompressionKind.NONE;
+      case ZLIB: return OrcProto.CompressionKind.ZLIB;
+      case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+      case LZO: return OrcProto.CompressionKind.LZO;
+      default:
+        throw new IllegalArgumentException("Unknown compression " + kind);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    rawWriter.hflush();
+    // TODO: reset?
+  }
+
+  @Override
+  public long getRawWriterPosition() throws IOException {
+    return rawWriter.getPos();
+  }
+
+  @Override
+  public void appendRawStripe(byte[] stripe, int offset, int length,
+      StripeInformation.Builder dirEntry) throws IOException {
+    long start = rawWriter.getPos();
+    long availBlockSpace = blockSize - (start % blockSize);
+
+    // see if stripe can fit in the current hdfs block, else pad the remaining
+    // space in the block
+    if (length < blockSize && length > availBlockSpace &&
+        addBlockPadding) {
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+      LOG.info(String.format("Padding ORC by %d bytes while merging..",
+          availBlockSpace));
+      start += availBlockSpace;
+      while (availBlockSpace > 0) {
+        int writeLen = (int) Math.min(availBlockSpace, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        availBlockSpace -= writeLen;
+      }
+    }
+
+    rawWriter.write(stripe);
+    dirEntry.setOffset(start);
+  }
+
+
+  /**
+   * This class is used to hold the contents of streams as they are buffered.
+   * The TreeWriters write to the outStream and the codec compresses the
+   * data as buffers fill up and stores them in the output list. When the
+   * stripe is being written, the whole stream is written to the file.
+   */
+  private class BufferedStream implements OutStream.OutputReceiver {
+    private final OutStream outStream;
+    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+    BufferedStream(String name, int bufferSize,
+                   CompressionCodec codec) throws IOException {
+      outStream = new OutStream(name, bufferSize, codec, this);
+    }
+
+    /**
+     * Receive a buffer from the compression codec.
+     * @param buffer the buffer to save
+     */
+    @Override
+    public void output(ByteBuffer buffer) {
+      output.add(buffer);
+    }
+
+    /**
+     * @return the number of bytes in buffers that are allocated to this stream.
+     */
+    public long getBufferSize() {
+      long result = 0;
+      for (ByteBuffer buf: output) {
+        result += buf.capacity();
+      }
+      return outStream.getBufferSize() + result;
+    }
+
+    /**
+     * Write any saved buffers to the OutputStream if needed, and clears all the buffers.
+     */
+    public void spillToDiskAndClear() throws IOException {
+      if (!outStream.isSuppressed()) {
+        for (ByteBuffer buffer: output) {
+          rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+            buffer.remaining());
+        }
+      }
+      outStream.clear();
+      output.clear();
+    }
+
+    /**
+     * @return The number of bytes that will be written to the output. Assumes the stream writing
+     *         into this receiver has already been flushed.
+     */
+    public long getOutputSize() {
+      long result = 0;
+      for (ByteBuffer buffer: output) {
+        result += buffer.remaining();
+      }
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return outStream.toString();
+    }
+  }
+
+  @Override
+  public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+    BufferedStream result = streams.get(name);
+    if (result == null) {
+      EnumSet<Modifier> modifiers = createCompressionModifiers(name.getKind());
+      result = new BufferedStream(name.toString(), bufferSize,
+          codec == null ? null : codec.modify(modifiers));
+      streams.put(name, result);
+    }
+    return result.outStream;
+  }
+
+  private EnumSet<Modifier> createCompressionModifiers(Kind kind) {
+    switch (kind) {
+      case BLOOM_FILTER:
+      case DATA:
+      case DICTIONARY_DATA:
+        return EnumSet.of(Modifier.TEXT,
+            compressionStrategy == CompressionStrategy.SPEED ? Modifier.FAST : Modifier.DEFAULT);
+      case LENGTH:
+      case DICTIONARY_COUNT:
+      case PRESENT:
+      case ROW_INDEX:
+      case SECONDARY:
+        // easily compressed using the fastest modes
+        return EnumSet.of(CompressionCodec.Modifier.FASTEST, CompressionCodec.Modifier.BINARY);
+      default:
+        LOG.warn("Missing ORC compression modifiers for " + kind);
+        return null;
+    }
+  }
+
+  @Override
+  public void finalizeStripe(StripeFooter.Builder footerBuilder,
+      StripeInformation.Builder dirEntry) throws IOException {
+    long indexSize = 0;
+    long dataSize = 0;
+    for (Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+      BufferedStream receiver = pair.getValue();
+      OutStream outStream = receiver.outStream;
+      if (!outStream.isSuppressed()) {
+        outStream.flush();
+        long streamSize = receiver.getOutputSize();
+        StreamName name = pair.getKey();
+        footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+            .setKind(name.getKind()).setLength(streamSize));
+        if (StreamName.Area.INDEX == name.getArea()) {
+          indexSize += streamSize;
+        } else {
+          dataSize += streamSize;
+        }
+      }
+    }
+    dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+
+    OrcProto.StripeFooter footer = footerBuilder.build();
+    // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+    padStripe(indexSize, dataSize, footer.getSerializedSize());
+
+    // write out the data streams
+    for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+      pair.getValue().spillToDiskAndClear();
+    }
+    // Write out the footer.
+    writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+  }
+
+  @Override
+  public long estimateMemory() {
+    long result = 0;
+    for (BufferedStream stream: streams.values()) {
+      result += stream.getBufferSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeIndexStream(StreamName name, Builder rowIndex) throws IOException {
+    OutStream stream = getOrCreatePhysicalStream(name);
+    rowIndex.build().writeTo(stream);
+    stream.flush();
+  }
+
+  @Override
+  public void writeBloomFilterStream(
+      StreamName name, BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+    OutStream stream = getOrCreatePhysicalStream(name);
+    bloomFilterIndex.build().writeTo(stream);
+    stream.flush();
+  }
+
+  @VisibleForTesting
+  public OutputStream getStream() throws IOException {
+    initialize();
+    return rawWriter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java
new file mode 100644
index 0000000..dc0089a
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.hive.orc.OrcProto.BloomFilterIndex;
+import org.apache.hive.orc.OrcProto.Footer;
+import org.apache.hive.orc.OrcProto.Metadata;
+import org.apache.hive.orc.OrcProto.PostScript;
+import org.apache.hive.orc.OrcProto.RowIndex;
+import org.apache.hive.orc.OrcProto.StripeFooter;
+import org.apache.hive.orc.OrcProto.StripeInformation;
+
+public interface PhysicalWriter {
+
+  /**
+   * Creates all the streams/connections/etc. necessary to write.
+   */
+  void initialize() throws IOException;
+
+  /**
+   * Writes out the file metadata.
+   * @param builder Metadata builder to finalize and write.
+   */
+  void writeFileMetadata(Metadata.Builder builder) throws IOException;
+
+  /**
+   * Writes out the file footer.
+   * @param builder Footer builder to finalize and write.
+   */
+  void writeFileFooter(Footer.Builder builder) throws IOException;
+
+  /**
+   * Writes out the postscript (including the size byte if needed).
+   * @param builder Postscript builder to finalize and write.
+   */
+  void writePostScript(PostScript.Builder builder) throws IOException;
+
+  /**
+   * Creates physical stream to write data to.
+   * @param name Stream name.
+   * @return The output stream.
+   */
+  OutStream getOrCreatePhysicalStream(StreamName name) throws IOException;
+
+  /**
+   * Flushes the data in all the streams, spills them to disk, write out stripe footer.
+   * @param footer Stripe footer to be updated with relevant data and written out.
+   * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+   */
+  void finalizeStripe(StripeFooter.Builder footer,
+      StripeInformation.Builder dirEntry) throws IOException;
+
+  /**
+   * Writes out the index for the stripe column.
+   * @param streamName Stream name.
+   * @param rowIndex Row index entries to write.
+   */
+  void writeIndexStream(StreamName name, RowIndex.Builder rowIndex) throws IOException;
+
+  /**
+   * Writes out the index for the stripe column.
+   * @param streamName Stream name.
+   * @param bloomFilterIndex Bloom filter index to write.
+   */
+  void writeBloomFilterStream(StreamName streamName,
+      BloomFilterIndex.Builder bloomFilterIndex) throws IOException;
+
+  /**
+   * Closes the writer.
+   */
+  void close() throws IOException;
+
+  /**
+   * Force-flushes the writer.
+   */
+  void flush() throws IOException;
+
+  /**
+   * @return the physical writer position (e.g. for updater).
+   */
+  long getRawWriterPosition() throws IOException;
+
+  /** @return physical stripe size, taking padding into account. */
+  long getPhysicalStripeSize();
+
+  /** @return whether the writer is compressed. */
+  boolean isCompressed();
+
+  /**
+   * Appends raw stripe data (e.g. for file merger).
+   * @param stripe Stripe data buffer.
+   * @param offset Stripe data buffer offset.
+   * @param length Stripe data buffer length.
+   * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+   * @throws IOException
+   */
+  void appendRawStripe(byte[] stripe, int offset, int length,
+      StripeInformation.Builder dirEntry) throws IOException;
+
+  /**
+   * @return the estimated memory usage for the stripe.
+   */
+  long estimateMemory();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java b/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java
new file mode 100644
index 0000000..36c2654
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+/**
+ * An interface used for seeking to a row index.
+ */
+public interface PositionProvider {
+  long getNext();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java b/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java
new file mode 100644
index 0000000..11eb9cc
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+public interface PositionRecorder {
+  void addPosition(long offset);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java b/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java
new file mode 100644
index 0000000..1b8f7c2
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class PositionedOutputStream extends OutputStream {
+
+  /**
+   * Record the current position to the recorder.
+   * @param recorder the object that receives the position
+   * @throws IOException
+   */
+  public abstract void getPosition(PositionRecorder recorder
+                                   ) throws IOException;
+
+  /**
+   * Get the memory size currently allocated as buffer associated with this
+   * stream.
+   * @return the number of bytes used by buffers.
+   */
+  public abstract long getBufferSize();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java
new file mode 100644
index 0000000..8ab9e92
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hive.orc.ColumnStatistics;
+import org.apache.hive.orc.CompressionCodec;
+import org.apache.hive.orc.FileFormatException;
+import org.apache.hive.orc.FileMetadata;
+import org.apache.hive.orc.OrcFile;
+import org.apache.hive.orc.CompressionKind;
+import org.apache.hive.orc.OrcUtils;
+import org.apache.hive.orc.Reader;
+import org.apache.hive.orc.RecordReader;
+import org.apache.hive.orc.TypeDescription;
+import org.apache.hive.orc.StripeInformation;
+import org.apache.hive.orc.StripeStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.orc.OrcProto;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.CodedInputStream;
+
+public class ReaderImpl implements Reader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class);
+
+  private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
+
+  protected final FileSystem fileSystem;
+  private final long maxLength;
+  protected final Path path;
+  protected final CompressionKind compressionKind;
+  protected CompressionCodec codec;
+  protected int bufferSize;
+  protected OrcProto.Metadata metadata;
+  private List<OrcProto.StripeStatistics> stripeStats;
+  private final int metadataSize;
+  protected final List<OrcProto.Type> types;
+  private TypeDescription schema;
+  private final List<OrcProto.UserMetadataItem> userMetadata;
+  private final List<OrcProto.ColumnStatistics> fileStats;
+  private final List<StripeInformation> stripes;
+  protected final int rowIndexStride;
+  private final long contentLength, numberOfRows;
+
+  private long deserializedSize = -1;
+  protected final Configuration conf;
+  private final List<Integer> versionList;
+  private final OrcFile.WriterVersion writerVersion;
+
+  protected OrcTail tail;
+
+  public static class StripeInformationImpl
+      implements StripeInformation {
+    private final OrcProto.StripeInformation stripe;
+
+    public StripeInformationImpl(OrcProto.StripeInformation stripe) {
+      this.stripe = stripe;
+    }
+
+    @Override
+    public long getOffset() {
+      return stripe.getOffset();
+    }
+
+    @Override
+    public long getLength() {
+      return stripe.getDataLength() + getIndexLength() + getFooterLength();
+    }
+
+    @Override
+    public long getDataLength() {
+      return stripe.getDataLength();
+    }
+
+    @Override
+    public long getFooterLength() {
+      return stripe.getFooterLength();
+    }
+
+    @Override
+    public long getIndexLength() {
+      return stripe.getIndexLength();
+    }
+
+    @Override
+    public long getNumberOfRows() {
+      return stripe.getNumberOfRows();
+    }
+
+    @Override
+    public String toString() {
+      return "offset: " + getOffset() + " data: " + getDataLength() +
+        " rows: " + getNumberOfRows() + " tail: " + getFooterLength() +
+        " index: " + getIndexLength();
+    }
+  }
+
+  @Override
+  public long getNumberOfRows() {
+    return numberOfRows;
+  }
+
+  @Override
+  public List<String> getMetadataKeys() {
+    List<String> result = new ArrayList<String>();
+    for(OrcProto.UserMetadataItem item: userMetadata) {
+      result.add(item.getName());
+    }
+    return result;
+  }
+
+  @Override
+  public ByteBuffer getMetadataValue(String key) {
+    for(OrcProto.UserMetadataItem item: userMetadata) {
+      if (item.hasName() && item.getName().equals(key)) {
+        return item.getValue().asReadOnlyByteBuffer();
+      }
+    }
+    throw new IllegalArgumentException("Can't find user metadata " + key);
+  }
+
+  public boolean hasMetadataValue(String key) {
+    for(OrcProto.UserMetadataItem item: userMetadata) {
+      if (item.hasName() && item.getName().equals(key)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public CompressionKind getCompressionKind() {
+    return compressionKind;
+  }
+
+  @Override
+  public int getCompressionSize() {
+    return bufferSize;
+  }
+
+  @Override
+  public List<StripeInformation> getStripes() {
+    return stripes;
+  }
+
+  @Override
+  public long getContentLength() {
+    return contentLength;
+  }
+
+  @Override
+  public List<OrcProto.Type> getTypes() {
+    return types;
+  }
+
+  @Override
+  public OrcFile.Version getFileVersion() {
+    for (OrcFile.Version version: OrcFile.Version.values()) {
+      if ((versionList != null && !versionList.isEmpty()) &&
+          version.getMajor() == versionList.get(0) &&
+          version.getMinor() == versionList.get(1)) {
+        return version;
+      }
+    }
+    return OrcFile.Version.V_0_11;
+  }
+
+  @Override
+  public OrcFile.WriterVersion getWriterVersion() {
+    return writerVersion;
+  }
+
+  @Override
+  public OrcProto.FileTail getFileTail() {
+    return tail.getFileTail();
+  }
+
+  @Override
+  public int getRowIndexStride() {
+    return rowIndexStride;
+  }
+
+  @Override
+  public ColumnStatistics[] getStatistics() {
+    ColumnStatistics[] result = new ColumnStatistics[types.size()];
+    for(int i=0; i < result.length; ++i) {
+      result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i));
+    }
+    return result;
+  }
+
+  @Override
+  public TypeDescription getSchema() {
+    return schema;
+  }
+
+  /**
+   * Ensure this is an ORC file to prevent users from trying to read text
+   * files or RC files as ORC files.
+   * @param in the file being read
+   * @param path the filename for error messages
+   * @param psLen the postscript length
+   * @param buffer the tail of the file
+   * @throws IOException
+   */
+  protected static void ensureOrcFooter(FSDataInputStream in,
+                                        Path path,
+                                        int psLen,
+                                        ByteBuffer buffer) throws IOException {
+    int magicLength = OrcFile.MAGIC.length();
+    int fullLength = magicLength + 1;
+    if (psLen < fullLength || buffer.remaining() < fullLength) {
+      throw new FileFormatException("Malformed ORC file " + path +
+          ". Invalid postscript length " + psLen);
+    }
+    int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
+    byte[] array = buffer.array();
+    // now look for the magic string at the end of the postscript.
+    if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
+      // If it isn't there, this may be the 0.11.0 version of ORC.
+      // Read the first 3 bytes of the file to check for the header
+      byte[] header = new byte[magicLength];
+      in.readFully(0, header, 0, magicLength);
+      // if it isn't there, this isn't an ORC file
+      if (!Text.decode(header, 0 , magicLength).equals(OrcFile.MAGIC)) {
+        throw new FileFormatException("Malformed ORC file " + path +
+            ". Invalid postscript.");
+      }
+    }
+  }
+
+  /**
+   * Ensure this is an ORC file to prevent users from trying to read text
+   * files or RC files as ORC files.
+   * @param psLen the postscript length
+   * @param buffer the tail of the file
+   * @throws IOException
+   */
+  protected static void ensureOrcFooter(ByteBuffer buffer, int psLen) throws IOException {
+    int magicLength = OrcFile.MAGIC.length();
+    int fullLength = magicLength + 1;
+    if (psLen < fullLength || buffer.remaining() < fullLength) {
+      throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
+    }
+
+    int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
+    byte[] array = buffer.array();
+    // now look for the magic string at the end of the postscript.
+    if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
+      // if it isn't there, this may be 0.11.0 version of the ORC file.
+      // Read the first 3 bytes from the buffer to check for the header
+      if (!Text.decode(buffer.array(), 0, magicLength).equals(OrcFile.MAGIC)) {
+        throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
+      }
+    }
+  }
+
+  /**
+   * Build a version string out of an array.
+   * @param version the version number as a list
+   * @return the human readable form of the version string
+   */
+  private static String versionString(List<Integer> version) {
+    StringBuilder buffer = new StringBuilder();
+    for(int i=0; i < version.size(); ++i) {
+      if (i != 0) {
+        buffer.append('.');
+      }
+      buffer.append(version.get(i));
+    }
+    return buffer.toString();
+  }
+
+  /**
+   * Check to see if this ORC file is from a future version and if so,
+   * warn the user that we may not be able to read all of the column encodings.
+   * @param log the logger to write any error message to
+   * @param path the data source path for error messages
+   * @param version the version of hive that wrote the file.
+   */
+  protected static void checkOrcVersion(Logger log, Path path,
+                                        List<Integer> version) {
+    if (version.size() >= 1) {
+      int major = version.get(0);
+      int minor = 0;
+      if (version.size() >= 2) {
+        minor = version.get(1);
+      }
+      if (major > OrcFile.Version.CURRENT.getMajor() ||
+          (major == OrcFile.Version.CURRENT.getMajor() &&
+           minor > OrcFile.Version.CURRENT.getMinor())) {
+        log.warn(path + " was written by a future Hive version " +
+                 versionString(version) +
+                 ". This file may not be readable by this version of Hive.");
+      }
+    }
+  }
+
+  /**
+  * Constructor that let's the user specify additional options.
+   * @param path pathname for file
+   * @param options options for reading
+   * @throws IOException
+   */
+  public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException {
+    FileSystem fs = options.getFilesystem();
+    if (fs == null) {
+      fs = path.getFileSystem(options.getConfiguration());
+    }
+    this.fileSystem = fs;
+    this.path = path;
+    this.conf = options.getConfiguration();
+    this.maxLength = options.getMaxLength();
+    FileMetadata fileMetadata = options.getFileMetadata();
+    if (fileMetadata != null) {
+      this.compressionKind = fileMetadata.getCompressionKind();
+      this.bufferSize = fileMetadata.getCompressionBufferSize();
+      this.codec = PhysicalFsWriter.createCodec(compressionKind);
+      this.metadataSize = fileMetadata.getMetadataSize();
+      this.stripeStats = fileMetadata.getStripeStats();
+      this.versionList = fileMetadata.getVersionList();
+      this.writerVersion =
+          OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum());
+      this.types = fileMetadata.getTypes();
+      this.rowIndexStride = fileMetadata.getRowIndexStride();
+      this.contentLength = fileMetadata.getContentLength();
+      this.numberOfRows = fileMetadata.getNumberOfRows();
+      this.fileStats = fileMetadata.getFileStats();
+      this.stripes = fileMetadata.getStripes();
+      this.userMetadata = null; // not cached and not needed here
+    } else {
+      OrcTail orcTail = options.getOrcTail();
+      if (orcTail == null) {
+        tail = extractFileTail(fs, path, options.getMaxLength());
+        options.orcTail(tail);
+      } else {
+        tail = orcTail;
+      }
+      this.compressionKind = tail.getCompressionKind();
+      this.codec = tail.getCompressionCodec();
+      this.bufferSize = tail.getCompressionBufferSize();
+      this.metadataSize = tail.getMetadataSize();
+      this.versionList = tail.getPostScript().getVersionList();
+      this.types = tail.getFooter().getTypesList();
+      this.rowIndexStride = tail.getFooter().getRowIndexStride();
+      this.contentLength = tail.getFooter().getContentLength();
+      this.numberOfRows = tail.getFooter().getNumberOfRows();
+      this.userMetadata = tail.getFooter().getMetadataList();
+      this.fileStats = tail.getFooter().getStatisticsList();
+      this.writerVersion = tail.getWriterVersion();
+      this.stripes = tail.getStripes();
+      this.stripeStats = tail.getStripeStatisticsProto();
+    }
+    this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
+  }
+
+  /**
+   * Get the WriterVersion based on the ORC file postscript.
+   * @param writerVersion the integer writer version
+   * @return the version of the software that produced the file
+   */
+  public static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
+    for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
+      if (version.getId() == writerVersion) {
+        return version;
+      }
+    }
+    return OrcFile.WriterVersion.FUTURE;
+  }
+
+  private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+      int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+    bb.position(footerAbsPos);
+    bb.limit(footerAbsPos + footerSize);
+    return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
+        Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
+  }
+
+  public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+      int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+    bb.position(metadataAbsPos);
+    bb.limit(metadataAbsPos + metadataSize);
+    return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
+        Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
+  }
+
+  private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+      int psLen, int psAbsOffset) throws IOException {
+    // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+    assert bb.hasArray();
+    CodedInputStream in = CodedInputStream.newInstance(
+        bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+    OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+    checkOrcVersion(LOG, path, ps.getVersionList());
+
+    // Check compression codec.
+    switch (ps.getCompression()) {
+      case NONE:
+        break;
+      case ZLIB:
+        break;
+      case SNAPPY:
+        break;
+      case LZO:
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown compression");
+    }
+    return ps;
+  }
+
+  public static OrcTail extractFileTail(ByteBuffer buffer)
+      throws IOException {
+    return extractFileTail(buffer, -1, -1);
+  }
+
+  public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long modificationTime)
+      throws IOException {
+    int readSize = buffer.limit();
+    int psLen = buffer.get(readSize - 1) & 0xff;
+    int psOffset = readSize - 1 - psLen;
+    ensureOrcFooter(buffer, psLen);
+    byte[] psBuffer = new byte[psLen];
+    System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
+    OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
+    int footerSize = (int) ps.getFooterLength();
+    CompressionCodec codec = PhysicalFsWriter
+        .createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+    OrcProto.Footer footer = extractFooter(buffer,
+        (int) (buffer.position() + ps.getMetadataLength()),
+        footerSize, codec, (int) ps.getCompressionBlockSize());
+    OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder()
+        .setPostscriptLength(psLen)
+        .setPostscript(ps)
+        .setFooter(footer)
+        .setFileLength(fileLength);
+    // clear does not clear the contents but sets position to 0 and limit = capacity
+    buffer.clear();
+    return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime);
+  }
+
+  protected OrcTail extractFileTail(FileSystem fs, Path path,
+      long maxFileLength) throws IOException {
+    FSDataInputStream file = fs.open(path);
+    ByteBuffer buffer;
+    OrcProto.PostScript ps;
+    OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
+    long modificationTime;
+    try {
+      // figure out the size of the file using the option or filesystem
+      long size;
+      if (maxFileLength == Long.MAX_VALUE) {
+        FileStatus fileStatus = fs.getFileStatus(path);
+        size = fileStatus.getLen();
+        modificationTime = fileStatus.getModificationTime();
+      } else {
+        size = maxFileLength;
+        modificationTime = -1;
+      }
+      fileTailBuilder.setFileLength(size);
+
+      //read last bytes into buffer to get PostScript
+      int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
+      buffer = ByteBuffer.allocate(readSize);
+      assert buffer.position() == 0;
+      file.readFully((size - readSize),
+          buffer.array(), buffer.arrayOffset(), readSize);
+      buffer.position(0);
+
+      //read the PostScript
+      //get length of PostScript
+      int psLen = buffer.get(readSize - 1) & 0xff;
+      ensureOrcFooter(file, path, psLen, buffer);
+      int psOffset = readSize - 1 - psLen;
+      ps = extractPostScript(buffer, path, psLen, psOffset);
+      bufferSize = (int) ps.getCompressionBlockSize();
+      codec = PhysicalFsWriter.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+      fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps);
+
+      int footerSize = (int) ps.getFooterLength();
+      int metadataSize = (int) ps.getMetadataLength();
+
+      //check if extra bytes need to be read
+      int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
+      int tailSize = 1 + psLen + footerSize + metadataSize;
+      if (extra > 0) {
+        //more bytes need to be read, seek back to the right place and read extra bytes
+        ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
+        file.readFully((size - readSize - extra), extraBuf.array(),
+            extraBuf.arrayOffset() + extraBuf.position(), extra);
+        extraBuf.position(extra);
+        //append with already read bytes
+        extraBuf.put(buffer);
+        buffer = extraBuf;
+        buffer.position(0);
+        buffer.limit(tailSize);
+        readSize += extra;
+        psOffset = readSize - 1 - psLen;
+      } else {
+        //footer is already in the bytes in buffer, just adjust position, length
+        buffer.position(psOffset - footerSize - metadataSize);
+        buffer.limit(buffer.position() + tailSize);
+      }
+
+      buffer.mark();
+      int footerOffset = psOffset - footerSize;
+      buffer.position(footerOffset);
+      ByteBuffer footerBuffer = buffer.slice();
+      buffer.reset();
+      OrcProto.Footer footer = extractFooter(footerBuffer, 0, footerSize,
+          codec, bufferSize);
+      fileTailBuilder.setFooter(footer);
+    } finally {
+      try {
+        file.close();
+      } catch (IOException ex) {
+        LOG.error("Failed to close the file after another error", ex);
+      }
+    }
+
+    ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining());
+    serializedTail.put(buffer.slice());
+    serializedTail.rewind();
+    return new OrcTail(fileTailBuilder.build(), serializedTail, modificationTime);
+  }
+
+  @Override
+  public ByteBuffer getSerializedFileFooter() {
+    return tail.getSerializedTail();
+  }
+
+  @Override
+  public RecordReader rows() throws IOException {
+    return rows(new Options());
+  }
+
+  @Override
+  public RecordReader rows(Options options) throws IOException {
+    LOG.info("Reading ORC rows from " + path + " with " + options);
+    return new RecordReaderImpl(this, options);
+  }
+
+
+  @Override
+  public long getRawDataSize() {
+    // if the deserializedSize is not computed, then compute it, else
+    // return the already computed size. since we are reading from the footer
+    // we don't have to compute deserialized size repeatedly
+    if (deserializedSize == -1) {
+      List<Integer> indices = Lists.newArrayList();
+      for (int i = 0; i < fileStats.size(); ++i) {
+        indices.add(i);
+      }
+      deserializedSize = getRawDataSizeFromColIndices(indices);
+    }
+    return deserializedSize;
+  }
+
+  @Override
+  public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+    return getRawDataSizeFromColIndices(colIndices, types, fileStats);
+  }
+
+  public static long getRawDataSizeFromColIndices(
+      List<Integer> colIndices, List<OrcProto.Type> types,
+      List<OrcProto.ColumnStatistics> stats) {
+    long result = 0;
+    for (int colIdx : colIndices) {
+      result += getRawDataSizeOfColumn(colIdx, types, stats);
+    }
+    return result;
+  }
+
+  private static long getRawDataSizeOfColumn(int colIdx, List<OrcProto.Type> types,
+      List<OrcProto.ColumnStatistics> stats) {
+    OrcProto.ColumnStatistics colStat = stats.get(colIdx);
+    long numVals = colStat.getNumberOfValues();
+    OrcProto.Type type = types.get(colIdx);
+
+    switch (type.getKind()) {
+    case BINARY:
+      // old orc format doesn't support binary statistics. checking for binary
+      // statistics is not required as protocol buffers takes care of it.
+      return colStat.getBinaryStatistics().getSum();
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+      // old orc format doesn't support sum for string statistics. checking for
+      // existence is not required as protocol buffers takes care of it.
+
+      // ORC strings are deserialized to java strings. so use java data model's
+      // string size
+      numVals = numVals == 0 ? 1 : numVals;
+      int avgStrLen = (int) (colStat.getStringStatistics().getSum() / numVals);
+      return numVals * JavaDataModel.get().lengthForStringOfLength(avgStrLen);
+    case TIMESTAMP:
+      return numVals * JavaDataModel.get().lengthOfTimestamp();
+    case DATE:
+      return numVals * JavaDataModel.get().lengthOfDate();
+    case DECIMAL:
+      return numVals * JavaDataModel.get().lengthOfDecimal();
+    case DOUBLE:
+    case LONG:
+      return numVals * JavaDataModel.get().primitive2();
+    case FLOAT:
+    case INT:
+    case SHORT:
+    case BOOLEAN:
+    case BYTE:
+      return numVals * JavaDataModel.get().primitive1();
+    default:
+      LOG.debug("Unknown primitive category: " + type.getKind());
+      break;
+    }
+
+    return 0;
+  }
+
+  @Override
+  public long getRawDataSizeOfColumns(List<String> colNames) {
+    List<Integer> colIndices = getColumnIndicesFromNames(colNames);
+    return getRawDataSizeFromColIndices(colIndices);
+  }
+
+  private List<Integer> getColumnIndicesFromNames(List<String> colNames) {
+    // top level struct
+    OrcProto.Type type = types.get(0);
+    List<Integer> colIndices = Lists.newArrayList();
+    List<String> fieldNames = type.getFieldNamesList();
+    int fieldIdx;
+    for (String colName : colNames) {
+      if (fieldNames.contains(colName)) {
+        fieldIdx = fieldNames.indexOf(colName);
+      } else {
+        String s = "Cannot find field for: " + colName + " in ";
+        for (String fn : fieldNames) {
+          s += fn + ", ";
+        }
+        LOG.warn(s);
+        continue;
+      }
+
+      // a single field may span multiple columns. find start and end column
+      // index for the requested field
+      int idxStart = type.getSubtypes(fieldIdx);
+
+      int idxEnd;
+
+      // if the specified is the last field and then end index will be last
+      // column index
+      if (fieldIdx + 1 > fieldNames.size() - 1) {
+        idxEnd = getLastIdx() + 1;
+      } else {
+        idxEnd = type.getSubtypes(fieldIdx + 1);
+      }
+
+      // if start index and end index are same then the field is a primitive
+      // field else complex field (like map, list, struct, union)
+      if (idxStart == idxEnd) {
+        // simple field
+        colIndices.add(idxStart);
+      } else {
+        // complex fields spans multiple columns
+        for (int i = idxStart; i < idxEnd; i++) {
+          colIndices.add(i);
+        }
+      }
+    }
+    return colIndices;
+  }
+
+  private int getLastIdx() {
+    Set<Integer> indices = new HashSet<>();
+    for (OrcProto.Type type : types) {
+      indices.addAll(type.getSubtypesList());
+    }
+    return Collections.max(indices);
+  }
+
+  @Override
+  public List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
+    return stripeStats;
+  }
+
+  @Override
+  public List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics() {
+    return fileStats;
+  }
+
+  @Override
+  public List<StripeStatistics> getStripeStatistics() throws IOException {
+    if (stripeStats == null && metadata == null) {
+      metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+      stripeStats = metadata.getStripeStatsList();
+    }
+    List<StripeStatistics> result = new ArrayList<>();
+    for (OrcProto.StripeStatistics ss : stripeStats) {
+      result.add(new StripeStatistics(ss.getColStatsList()));
+    }
+    return result;
+  }
+
+  public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() {
+    return userMetadata;
+  }
+
+  @Override
+  public List<Integer> getVersionList() {
+    return versionList;
+  }
+
+  @Override
+  public int getMetadataSize() {
+    return metadataSize;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("ORC Reader(");
+    buffer.append(path);
+    if (maxLength != -1) {
+      buffer.append(", ");
+      buffer.append(maxLength);
+    }
+    buffer.append(")");
+    return buffer.toString();
+  }
+}