You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/12 00:28:03 UTC

[06/16] hive git commit: HIVE-11890. Create ORC submodue. (omalley reviewed by prasanthj)

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
deleted file mode 100644
index 227cfca..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.CodedInputStream;
-
-public abstract class InStream extends InputStream {
-
-  private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
-  private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
-
-  protected final String name;
-  protected long length;
-
-  public InStream(String name, long length) {
-    this.name = name;
-    this.length = length;
-  }
-
-  public String getStreamName() {
-    return name;
-  }
-
-  public long getStreamLength() {
-    return length;
-  }
-
-  static class UncompressedStream extends InStream {
-    private List<DiskRange> bytes;
-    private long length;
-    protected long currentOffset;
-    private ByteBuffer range;
-    private int currentRange;
-
-    public UncompressedStream(String name, List<DiskRange> input, long length) {
-      super(name, length);
-      reset(input, length);
-    }
-
-    protected void reset(List<DiskRange> input, long length) {
-      this.bytes = input;
-      this.length = length;
-      currentRange = 0;
-      currentOffset = 0;
-      range = null;
-    }
-
-    @Override
-    public int read() {
-      if (range == null || range.remaining() == 0) {
-        if (currentOffset == length) {
-          return -1;
-        }
-        seek(currentOffset);
-      }
-      currentOffset += 1;
-      return 0xff & range.get();
-    }
-
-    @Override
-    public int read(byte[] data, int offset, int length) {
-      if (range == null || range.remaining() == 0) {
-        if (currentOffset == this.length) {
-          return -1;
-        }
-        seek(currentOffset);
-      }
-      int actualLength = Math.min(length, range.remaining());
-      range.get(data, offset, actualLength);
-      currentOffset += actualLength;
-      return actualLength;
-    }
-
-    @Override
-    public int available() {
-      if (range != null && range.remaining() > 0) {
-        return range.remaining();
-      }
-      return (int) (length - currentOffset);
-    }
-
-    @Override
-    public void close() {
-      currentRange = bytes.size();
-      currentOffset = length;
-      // explicit de-ref of bytes[]
-      bytes.clear();
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      seek(index.getNext());
-    }
-
-    public void seek(long desired) {
-      if (desired == 0 && bytes.isEmpty()) {
-        logEmptySeek(name);
-        return;
-      }
-      int i = 0;
-      for (DiskRange curRange : bytes) {
-        if (desired == 0 && curRange.getData().remaining() == 0) {
-          logEmptySeek(name);
-          return;
-        }
-        if (curRange.getOffset() <= desired &&
-            (desired - curRange.getOffset()) < curRange.getLength()) {
-          currentOffset = desired;
-          currentRange = i;
-          this.range = curRange.getData().duplicate();
-          int pos = range.position();
-          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
-          this.range.position(pos);
-          return;
-        }
-        ++i;
-      }
-      // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
-        currentOffset = desired;
-        currentRange = segments - 1;
-        DiskRange curRange = bytes.get(currentRange);
-        this.range = curRange.getData().duplicate();
-        int pos = range.position();
-        pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
-        this.range.position(pos);
-        return;
-      }
-      throw new IllegalArgumentException("Seek in " + name + " to " +
-        desired + " is outside of the data");
-    }
-
-    @Override
-    public String toString() {
-      return "uncompressed stream " + name + " position: " + currentOffset +
-          " length: " + length + " range: " + currentRange +
-          " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
-    }
-  }
-
-  private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
-    // TODO: use the same pool as the ORC readers
-    if (isDirect) {
-      return ByteBuffer.allocateDirect(size);
-    } else {
-      return ByteBuffer.allocate(size);
-    }
-  }
-
-  private static class CompressedStream extends InStream {
-    private final List<DiskRange> bytes;
-    private final int bufferSize;
-    private ByteBuffer uncompressed;
-    private final CompressionCodec codec;
-    private ByteBuffer compressed;
-    private long currentOffset;
-    private int currentRange;
-    private boolean isUncompressedOriginal;
-
-    public CompressedStream(String name, List<DiskRange> input, long length,
-                            CompressionCodec codec, int bufferSize) {
-      super(name, length);
-      this.bytes = input;
-      this.codec = codec;
-      this.bufferSize = bufferSize;
-      currentOffset = 0;
-      currentRange = 0;
-    }
-
-    private void allocateForUncompressed(int size, boolean isDirect) {
-      uncompressed = allocateBuffer(size, isDirect);
-    }
-
-    private void readHeader() throws IOException {
-      if (compressed == null || compressed.remaining() <= 0) {
-        seek(currentOffset);
-      }
-      if (compressed.remaining() > OutStream.HEADER_SIZE) {
-        int b0 = compressed.get() & 0xff;
-        int b1 = compressed.get() & 0xff;
-        int b2 = compressed.get() & 0xff;
-        boolean isOriginal = (b0 & 0x01) == 1;
-        int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
-
-        if (chunkLength > bufferSize) {
-          throw new IllegalArgumentException("Buffer size too small. size = " +
-              bufferSize + " needed = " + chunkLength);
-        }
-        // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
-        assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
-        currentOffset += OutStream.HEADER_SIZE;
-
-        ByteBuffer slice = this.slice(chunkLength);
-
-        if (isOriginal) {
-          uncompressed = slice;
-          isUncompressedOriginal = true;
-        } else {
-          if (isUncompressedOriginal) {
-            allocateForUncompressed(bufferSize, slice.isDirect());
-            isUncompressedOriginal = false;
-          } else if (uncompressed == null) {
-            allocateForUncompressed(bufferSize, slice.isDirect());
-          } else {
-            uncompressed.clear();
-          }
-          codec.decompress(slice, uncompressed);
-         }
-      } else {
-        throw new IllegalStateException("Can't read header at " + this);
-      }
-    }
-
-    @Override
-    public int read() throws IOException {
-      if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (currentOffset == length) {
-          return -1;
-        }
-        readHeader();
-      }
-      return 0xff & uncompressed.get();
-    }
-
-    @Override
-    public int read(byte[] data, int offset, int length) throws IOException {
-      if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (currentOffset == this.length) {
-          return -1;
-        }
-        readHeader();
-      }
-      int actualLength = Math.min(length, uncompressed.remaining());
-      uncompressed.get(data, offset, actualLength);
-      return actualLength;
-    }
-
-    @Override
-    public int available() throws IOException {
-      if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (currentOffset == length) {
-          return 0;
-        }
-        readHeader();
-      }
-      return uncompressed.remaining();
-    }
-
-    @Override
-    public void close() {
-      uncompressed = null;
-      compressed = null;
-      currentRange = bytes.size();
-      currentOffset = length;
-      bytes.clear();
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      seek(index.getNext());
-      long uncompressedBytes = index.getNext();
-      if (uncompressedBytes != 0) {
-        readHeader();
-        uncompressed.position(uncompressed.position() +
-                              (int) uncompressedBytes);
-      } else if (uncompressed != null) {
-        // mark the uncompressed buffer as done
-        uncompressed.position(uncompressed.limit());
-      }
-    }
-
-    /* slices a read only contiguous buffer of chunkLength */
-    private ByteBuffer slice(int chunkLength) throws IOException {
-      int len = chunkLength;
-      final long oldOffset = currentOffset;
-      ByteBuffer slice;
-      if (compressed.remaining() >= len) {
-        slice = compressed.slice();
-        // simple case
-        slice.limit(len);
-        currentOffset += len;
-        compressed.position(compressed.position() + len);
-        return slice;
-      } else if (currentRange >= (bytes.size() - 1)) {
-        // nothing has been modified yet
-        throw new IOException("EOF in " + this + " while trying to read " +
-            chunkLength + " bytes");
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format(
-            "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
-            compressed.remaining(), len));
-      }
-
-      // we need to consolidate 2 or more buffers into 1
-      // first copy out compressed buffers
-      ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
-      currentOffset += compressed.remaining();
-      len -= compressed.remaining();
-      copy.put(compressed);
-      ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
-
-      while (len > 0 && iter.hasNext()) {
-        ++currentRange;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
-        }
-        DiskRange range = iter.next();
-        compressed = range.getData().duplicate();
-        if (compressed.remaining() >= len) {
-          slice = compressed.slice();
-          slice.limit(len);
-          copy.put(slice);
-          currentOffset += len;
-          compressed.position(compressed.position() + len);
-          return copy;
-        }
-        currentOffset += compressed.remaining();
-        len -= compressed.remaining();
-        copy.put(compressed);
-      }
-
-      // restore offsets for exception clarity
-      seek(oldOffset);
-      throw new IOException("EOF in " + this + " while trying to read " +
-          chunkLength + " bytes");
-    }
-
-    private void seek(long desired) throws IOException {
-      if (desired == 0 && bytes.isEmpty()) {
-        logEmptySeek(name);
-        return;
-      }
-      int i = 0;
-      for (DiskRange range : bytes) {
-        if (range.getOffset() <= desired && desired < range.getEnd()) {
-          currentRange = i;
-          compressed = range.getData().duplicate();
-          int pos = compressed.position();
-          pos += (int)(desired - range.getOffset());
-          compressed.position(pos);
-          currentOffset = desired;
-          return;
-        }
-        ++i;
-      }
-      // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
-        DiskRange range = bytes.get(segments - 1);
-        currentRange = segments - 1;
-        compressed = range.getData().duplicate();
-        compressed.position(compressed.limit());
-        currentOffset = desired;
-        return;
-      }
-      throw new IOException("Seek outside of data in " + this + " to " + desired);
-    }
-
-    private String rangeString() {
-      StringBuilder builder = new StringBuilder();
-      int i = 0;
-      for (DiskRange range : bytes) {
-        if (i != 0) {
-          builder.append("; ");
-        }
-        builder.append(" range " + i + " = " + range.getOffset()
-            + " to " + (range.getEnd() - range.getOffset()));
-        ++i;
-      }
-      return builder.toString();
-    }
-
-    @Override
-    public String toString() {
-      return "compressed stream " + name + " position: " + currentOffset +
-          " length: " + length + " range: " + currentRange +
-          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
-          rangeString() +
-          (uncompressed == null ? "" :
-              " uncompressed: " + uncompressed.position() + " to " +
-                  uncompressed.limit());
-    }
-  }
-
-  public abstract void seek(PositionProvider index) throws IOException;
-
-  private static void logEmptySeek(String name) {
-    if (LOG.isWarnEnabled()) {
-      LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
-    }
-  }
-
-  /**
-   * Create an input stream from a list of buffers.
-   * @param fileName name of the file
-   * @param streamName the name of the stream
-   * @param buffers the list of ranges of bytes for the stream
-   * @param offsets a list of offsets (the same length as input) that must
-   *                contain the first offset of the each set of bytes in input
-   * @param length the length in bytes of the stream
-   * @param codec the compression codec
-   * @param bufferSize the compression buffer size
-   * @return an input stream
-   * @throws IOException
-   */
-  @VisibleForTesting
-  @Deprecated
-  public static InStream create(String streamName,
-                                ByteBuffer[] buffers,
-                                long[] offsets,
-                                long length,
-                                CompressionCodec codec,
-                                int bufferSize) throws IOException {
-    List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
-    for (int i = 0; i < buffers.length; ++i) {
-      input.add(new BufferChunk(buffers[i], offsets[i]));
-    }
-    return create(streamName, input, length, codec, bufferSize);
-  }
-
-  /**
-   * Create an input stream from a list of disk ranges with data.
-   * @param name the name of the stream
-   * @param input the list of ranges of bytes for the stream; from disk or cache
-   * @param length the length in bytes of the stream
-   * @param codec the compression codec
-   * @param bufferSize the compression buffer size
-   * @param cache Low-level cache to use to put data, if any. Only works with compressed streams.
-   * @return an input stream
-   * @throws IOException
-   */
-  public static InStream create(String name,
-                                List<DiskRange> input,
-                                long length,
-                                CompressionCodec codec,
-                                int bufferSize) throws IOException {
-    if (codec == null) {
-      return new UncompressedStream(name, input, length);
-    } else {
-      return new CompressedStream(name, input, length, codec, bufferSize);
-    }
-  }
-
-  /**
-   * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
-   *
-   * @param name       the name of the stream
-   * @param input      the list of ranges of bytes for the stream; from disk or cache
-   * @param length     the length in bytes of the stream
-   * @param codec      the compression codec
-   * @param bufferSize the compression buffer size
-   * @return coded input stream
-   * @throws IOException
-   */
-  public static CodedInputStream createCodedInputStream(
-      String name,
-      List<DiskRange> input,
-      long length,
-      CompressionCodec codec,
-      int bufferSize) throws IOException {
-    InStream inStream = create(name, input, length, codec, bufferSize);
-    CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
-    codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
-    return codedInputStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java
deleted file mode 100644
index 15625fa..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-/**
- * Statistics for all of the integer columns, such as byte, short, int, and
- * long.
- */
-public interface IntegerColumnStatistics extends ColumnStatistics {
-  /**
-   * Get the smallest value in the column. Only defined if getNumberOfValues
-   * is non-zero.
-   * @return the minimum
-   */
-  long getMinimum();
-
-  /**
-   * Get the largest value in the column. Only defined if getNumberOfValues
-   * is non-zero.
-   * @return the maximum
-   */
-  long getMaximum();
-
-  /**
-   * Is the sum defined? If the sum overflowed the counter this will be false.
-   * @return is the sum available
-   */
-  boolean isSumDefined();
-
-  /**
-   * Get the sum of the column. Only valid if isSumDefined returns true.
-   * @return the sum of the column
-   */
-  long getSum();
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
deleted file mode 100644
index b1c7f0a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-
-/**
- * Interface for reading integers.
- */
-public interface IntegerReader {
-
-  /**
-   * Seek to the position provided by index.
-   * @param index
-   * @throws IOException
-   */
-  void seek(PositionProvider index) throws IOException;
-
-  /**
-   * Skip number of specified rows.
-   * @param numValues
-   * @throws IOException
-   */
-  void skip(long numValues) throws IOException;
-
-  /**
-   * Check if there are any more values left.
-   * @return
-   * @throws IOException
-   */
-  boolean hasNext() throws IOException;
-
-  /**
-   * Return the next available value.
-   * @return
-   * @throws IOException
-   */
-  long next() throws IOException;
-
-  /**
-   * Return the next available vector for values.
-   * @return
-   * @throws IOException
-   */
-   void nextVector(LongColumnVector previous, long previousLen)
-      throws IOException;
-
-  void setInStream(InStream data);
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
deleted file mode 100644
index 775d02e..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-
-/**
- * Interface for writing integers.
- */
-interface IntegerWriter {
-
-  /**
-   * Get position from the stream.
-   * @param recorder
-   * @throws IOException
-   */
-  void getPosition(PositionRecorder recorder) throws IOException;
-
-  /**
-   * Write the integer value
-   * @param value
-   * @throws IOException
-   */
-  void write(long value) throws IOException;
-
-  /**
-   * Flush the buffer
-   * @throws IOException
-   */
-  void flush() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
index f9a6d9e..b746390 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
@@ -26,6 +26,20 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
 import org.codehaus.jettison.json.JSONArray;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.impl.OrcIndex;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TimestampColumnStatistics;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.codehaus.jettison.json.JSONStringer;
@@ -151,7 +165,7 @@ public class JsonFileDump {
             for (int colIdx : rowIndexCols) {
               sargColumns[colIdx] = true;
             }
-            RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, null, sargColumns);
+            OrcIndex indices = rows.readRowIndex(stripeIx, null, sargColumns);
             writer.key("indexes").array();
             for (int col : rowIndexCols) {
               writer.object();

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
deleted file mode 100644
index bb35b13..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Implements a memory manager that keeps a global context of how many ORC
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- * 
- * This class is not thread safe, but is re-entrant - ensure creation and all
- * invocations are triggered from the same thread.
- */
-class MemoryManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
-
-  /**
-   * How often should we check the memory sizes? Measured in rows added
-   * to all of the writers.
-   */
-  private static final int ROWS_BETWEEN_CHECKS = 5000;
-  private final long totalMemoryPool;
-  private final Map<Path, WriterInfo> writerList =
-      new HashMap<Path, WriterInfo>();
-  private long totalAllocation = 0;
-  private double currentScale = 1;
-  private int rowsAddedSinceCheck = 0;
-  private final OwnedLock ownerLock = new OwnedLock();
-
-  @SuppressWarnings("serial")
-  private static class OwnedLock extends ReentrantLock {
-    public Thread getOwner() {
-      return super.getOwner();
-    }
-  }
-
-  private static class WriterInfo {
-    long allocation;
-    Callback callback;
-    WriterInfo(long allocation, Callback callback) {
-      this.allocation = allocation;
-      this.callback = callback;
-    }
-  }
-
-  public interface Callback {
-    /**
-     * The writer needs to check its memory usage
-     * @param newScale the current scale factor for memory allocations
-     * @return true if the writer was over the limit
-     * @throws IOException
-     */
-    boolean checkMemory(double newScale) throws IOException;
-  }
-
-  /**
-   * Create the memory manager.
-   * @param conf use the configuration to find the maximum size of the memory
-   *             pool.
-   */
-  MemoryManager(Configuration conf) {
-    double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
-    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
-        getHeapMemoryUsage().getMax() * maxLoad);
-    ownerLock.lock();
-  }
-
-  /**
-   * Light weight thread-safety check for multi-threaded access patterns
-   */
-  private void checkOwner() {
-    Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
-        "Owner thread expected %s, got %s",
-        ownerLock.getOwner(),
-        Thread.currentThread());
-  }
-
-  /**
-   * Add a new writer's memory allocation to the pool. We use the path
-   * as a unique key to ensure that we don't get duplicates.
-   * @param path the file that is being written
-   * @param requestedAllocation the requested buffer size
-   */
-  void addWriter(Path path, long requestedAllocation,
-                              Callback callback) throws IOException {
-    checkOwner();
-    WriterInfo oldVal = writerList.get(path);
-    // this should always be null, but we handle the case where the memory
-    // manager wasn't told that a writer wasn't still in use and the task
-    // starts writing to the same path.
-    if (oldVal == null) {
-      oldVal = new WriterInfo(requestedAllocation, callback);
-      writerList.put(path, oldVal);
-      totalAllocation += requestedAllocation;
-    } else {
-      // handle a new writer that is writing to the same path
-      totalAllocation += requestedAllocation - oldVal.allocation;
-      oldVal.allocation = requestedAllocation;
-      oldVal.callback = callback;
-    }
-    updateScale(true);
-  }
-
-  /**
-   * Remove the given writer from the pool.
-   * @param path the file that has been closed
-   */
-  void removeWriter(Path path) throws IOException {
-    checkOwner();
-    WriterInfo val = writerList.get(path);
-    if (val != null) {
-      writerList.remove(path);
-      totalAllocation -= val.allocation;
-      if (writerList.isEmpty()) {
-        rowsAddedSinceCheck = 0;
-      }
-      updateScale(false);
-    }
-    if(writerList.isEmpty()) {
-      rowsAddedSinceCheck = 0;
-    }
-  }
-
-  /**
-   * Get the total pool size that is available for ORC writers.
-   * @return the number of bytes in the pool
-   */
-  long getTotalMemoryPool() {
-    return totalMemoryPool;
-  }
-
-  /**
-   * The scaling factor for each allocation to ensure that the pool isn't
-   * oversubscribed.
-   * @return a fraction between 0.0 and 1.0 of the requested size that is
-   * available for each writer.
-   */
-  double getAllocationScale() {
-    return currentScale;
-  }
-
-  /**
-   * Give the memory manager an opportunity for doing a memory check.
-   * @param rows number of rows added
-   * @throws IOException
-   */
-  void addedRow(int rows) throws IOException {
-    rowsAddedSinceCheck += rows;
-    if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
-      notifyWriters();
-    }
-  }
-
-  /**
-   * Notify all of the writers that they should check their memory usage.
-   * @throws IOException
-   */
-  void notifyWriters() throws IOException {
-    checkOwner();
-    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
-    for(WriterInfo writer: writerList.values()) {
-      boolean flushed = writer.callback.checkMemory(currentScale);
-      if (LOG.isDebugEnabled() && flushed) {
-        LOG.debug("flushed " + writer.toString());
-      }
-    }
-    rowsAddedSinceCheck = 0;
-  }
-
-  /**
-   * Update the currentScale based on the current allocation and pool size.
-   * This also updates the notificationTrigger.
-   * @param isAllocate is this an allocation?
-   */
-  private void updateScale(boolean isAllocate) throws IOException {
-    if (totalAllocation <= totalMemoryPool) {
-      currentScale = 1;
-    } else {
-      currentScale = (double) totalMemoryPool / totalAllocation;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
deleted file mode 100644
index cea324c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.BloomFilterIndex;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeFooter;
-
-public interface MetadataReader {
-  RecordReaderImpl.Index readRowIndex(StripeInformation stripe, StripeFooter footer,
-      boolean[] included, RowIndex[] indexes, boolean[] sargColumns,
-      BloomFilterIndex[] bloomFilterIndices) throws IOException;
-
-  StripeFooter readStripeFooter(StripeInformation stripe) throws IOException;
-
-  void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
deleted file mode 100644
index 4624927..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
-
-import com.google.common.collect.Lists;
-
-public class MetadataReaderImpl implements MetadataReader {
-  private final FSDataInputStream file;
-  private final CompressionCodec codec;
-  private final int bufferSize;
-  private final int typeCount;
-
-  public MetadataReaderImpl(FileSystem fileSystem, Path path,
-      CompressionCodec codec, int bufferSize, int typeCount) throws IOException {
-    this(fileSystem.open(path), codec, bufferSize, typeCount);
-  }
-
-  public MetadataReaderImpl(FSDataInputStream file,
-      CompressionCodec codec, int bufferSize, int typeCount) {
-    this.file = file;
-    this.codec = codec;
-    this.bufferSize = bufferSize;
-    this.typeCount = typeCount;
-  }
-
-  @Override
-  public RecordReaderImpl.Index readRowIndex(StripeInformation stripe,
-      OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes,
-      boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException {
-    if (footer == null) {
-      footer = readStripeFooter(stripe);
-    }
-    if (indexes == null) {
-      indexes = new OrcProto.RowIndex[typeCount];
-    }
-    if (bloomFilterIndices == null) {
-      bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
-    }
-    long offset = stripe.getOffset();
-    List<OrcProto.Stream> streams = footer.getStreamsList();
-    for (int i = 0; i < streams.size(); i++) {
-      OrcProto.Stream stream = streams.get(i);
-      OrcProto.Stream nextStream = null;
-      if (i < streams.size() - 1) {
-        nextStream = streams.get(i+1);
-      }
-      int col = stream.getColumn();
-      int len = (int) stream.getLength();
-      // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
-      // filter and combine the io to read row index and bloom filters for that column together
-      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
-        boolean readBloomFilter = false;
-        if (sargColumns != null && sargColumns[col] &&
-            nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
-          len += nextStream.getLength();
-          i += 1;
-          readBloomFilter = true;
-        }
-        if ((included == null || included[col]) && indexes[col] == null) {
-          byte[] buffer = new byte[len];
-          file.readFully(offset, buffer, 0, buffer.length);
-          ByteBuffer bb = ByteBuffer.wrap(buffer);
-          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-              Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
-               codec, bufferSize));
-          if (readBloomFilter) {
-            bb.position((int) stream.getLength());
-            bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
-                "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
-                nextStream.getLength(), codec, bufferSize));
-          }
-        }
-      }
-      offset += len;
-    }
-
-    RecordReaderImpl.Index index = new RecordReaderImpl.Index(indexes, bloomFilterIndices);
-    return index;
-  }
-
-  @Override
-  public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-    long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
-    int tailLength = (int) stripe.getFooterLength();
-
-    // read the footer
-    ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
-    file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-    return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
-        Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
-        tailLength, codec, bufferSize));
-  }
-
-  @Override
-  public void close() throws IOException {
-    file.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java
deleted file mode 100644
index 132889c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.util.Properties;
-
-/**
- * Define the configuration properties that Orc understands.
- */
-public enum OrcConf {
-  STRIPE_SIZE("orc.stripe.size", "hive.exec.orc.default.stripe.size",
-      64L * 1024 * 1024,
-      "Define the default ORC stripe size, in bytes."),
-  BLOCK_SIZE("orc.block.size", "hive.exec.orc.default.block.size",
-      256L * 1024 * 1024,
-      "Define the default file system block size for ORC files."),
-  ENABLE_INDEXES("orc.create.index", "orc.create.index", true,
-      "Should the ORC writer create indexes as part of the file."),
-  ROW_INDEX_STRIDE("orc.row.index.stride",
-      "hive.exec.orc.default.row.index.stride", 10000,
-      "Define the default ORC index stride in number of rows. (Stride is the\n"+
-          " number of rows n index entry represents.)"),
-  BUFFER_SIZE("orc.compress.size", "hive.exec.orc.default.buffer.size",
-      256 * 1024, "Define the default ORC buffer size, in bytes."),
-  BLOCK_PADDING("orc.block.padding", "hive.exec.orc.default.block.padding",
-      true,
-      "Define whether stripes should be padded to the HDFS block boundaries."),
-  COMPRESS("orc.compress", "hive.exec.orc.default.compress", "ZLIB",
-      "Define the default compression codec for ORC file"),
-  WRITE_FORMAT("orc.write.format", "hive.exec.orc.write.format", "0.12",
-      "Define the version of the file to write. Possible values are 0.11 and\n"+
-          " 0.12. If this parameter is not defined, ORC will use the run\n" +
-          " length encoding (RLE) introduced in Hive 0.12."),
-  ENCODING_STRATEGY("orc.encoding.strategy", "hive.exec.orc.encoding.strategy",
-      "SPEED",
-      "Define the encoding strategy to use while writing data. Changing this\n"+
-          "will only affect the light weight encoding for integers. This\n" +
-          "flag will not change the compression level of higher level\n" +
-          "compression codec (like ZLIB)."),
-  COMPRESSION_STRATEGY("orc.compression.strategy",
-      "hive.exec.orc.compression.strategy", "SPEED",
-      "Define the compression strategy to use while writing data.\n" +
-          "This changes the compression level of higher level compression\n" +
-          "codec (like ZLIB)."),
-  BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
-      "hive.exec.orc.block.padding.tolerance", 0.05,
-      "Define the tolerance for block padding as a decimal fraction of\n" +
-          "stripe size (for example, the default value 0.05 is 5% of the\n" +
-          "stripe size). For the defaults of 64Mb ORC stripe and 256Mb HDFS\n" +
-          "blocks, the default block padding tolerance of 5% will\n" +
-          "reserve a maximum of 3.2Mb for padding within the 256Mb block.\n" +
-          "In that case, if the available size within the block is more than\n"+
-          "3.2Mb, a new smaller stripe will be inserted to fit within that\n" +
-          "space. This will make sure that no stripe written will block\n" +
-          " boundaries and cause remote reads within a node local task."),
-  BLOOM_FILTER_FPP("orc.bloom.filter.fpp", "orc.default.bloom.fpp", 0.05,
-      "Define the default false positive probability for bloom filters."),
-  USE_ZEROCOPY("orc.use.zerocopy", "hive.exec.orc.zerocopy", false,
-      "Use zerocopy reads with ORC. (This requires Hadoop 2.3 or later.)"),
-  SKIP_CORRUPT_DATA("orc.skip.corrupt.data", "hive.exec.orc.skip.corrupt.data",
-      false,
-      "If ORC reader encounters corrupt data, this value will be used to\n" +
-          "determine whether to skip the corrupt data or throw exception.\n" +
-          "The default behavior is to throw exception."),
-  MEMORY_POOL("orc.memory.pool", "hive.exec.orc.memory.pool", 0.5,
-      "Maximum fraction of heap that can be used by ORC file writers"),
-  DICTIONARY_KEY_SIZE_THRESHOLD("orc.dictionary.key.threshold",
-      "hive.exec.orc.dictionary.key.size.threshold",
-      0.8,
-      "If the number of distinct keys in a dictionary is greater than this\n" +
-          "fraction of the total number of non-null rows, turn off \n" +
-          "dictionary encoding.  Use 1 to always use dictionary encoding."),
-  ROW_INDEX_STRIDE_DICTIONARY_CHECK("orc.dictionary.early.check",
-      "hive.orc.row.index.stride.dictionary.check",
-      true,
-      "If enabled dictionary check will happen after first row index stride\n" +
-          "(default 10000 rows) else dictionary check will happen before\n" +
-          "writing first stripe. In both cases, the decision to use\n" +
-          "dictionary or not will be retained thereafter."),
-  BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns", "orc.bloom.filter.columns",
-      "", "List of columns to create bloom filters for when writing.")
-  ;
-
-  private final String attribute;
-  private final String hiveConfName;
-  private final Object defaultValue;
-  private final String description;
-
-  OrcConf(String attribute,
-          String hiveConfName,
-          Object defaultValue,
-          String description) {
-    this.attribute = attribute;
-    this.hiveConfName = hiveConfName;
-    this.defaultValue = defaultValue;
-    this.description = description;
-  }
-
-  public String getAttribute() {
-    return attribute;
-  }
-
-  public String getHiveConfName() {
-    return hiveConfName;
-  }
-
-  public Object getDefaultValue() {
-    return defaultValue;
-  }
-
-  public String getDescription() {
-    return description;
-  }
-
-  private String lookupValue(Properties tbl, Configuration conf) {
-    String result = null;
-    if (tbl != null) {
-      result = tbl.getProperty(attribute);
-    }
-    if (result == null && conf != null) {
-      result = conf.get(attribute);
-      if (result == null) {
-        result = conf.get(hiveConfName);
-      }
-    }
-    return result;
-  }
-
-  public long getLong(Properties tbl, Configuration conf) {
-    String value = lookupValue(tbl, conf);
-    if (value != null) {
-      return Long.parseLong(value);
-    }
-    return ((Number) defaultValue).longValue();
-  }
-
-  public long getLong(Configuration conf) {
-    return getLong(null, conf);
-  }
-
-  public String getString(Properties tbl, Configuration conf) {
-    String value = lookupValue(tbl, conf);
-    return value == null ? (String) defaultValue : value;
-  }
-
-  public String getString(Configuration conf) {
-    return getString(null, conf);
-  }
-
-  public boolean getBoolean(Properties tbl, Configuration conf) {
-    String value = lookupValue(tbl, conf);
-    if (value != null) {
-      return Boolean.parseBoolean(value);
-    }
-    return (Boolean) defaultValue;
-  }
-
-  public boolean getBoolean(Configuration conf) {
-    return getBoolean(null, conf);
-  }
-
-  public double getDouble(Properties tbl, Configuration conf) {
-    String value = lookupValue(tbl, conf);
-    if (value != null) {
-      return Double.parseDouble(value);
-    }
-    return ((Number) defaultValue).doubleValue();
-  }
-
-  public double getDouble(Configuration conf) {
-    return getDouble(null, conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 56ac40b..975825a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,132 +26,17 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.impl.MemoryManager;
+import org.apache.orc.TypeDescription;
 
 /**
  * Contains factory methods to read or write ORC files.
  */
-public final class OrcFile {
-
-  public static final String MAGIC = "ORC";
-
-  /**
-   * Create a version number for the ORC file format, so that we can add
-   * non-forward compatible changes in the future. To make it easier for users
-   * to understand the version numbers, we use the Hive release number that
-   * first wrote that version of ORC files.
-   *
-   * Thus, if you add new encodings or other non-forward compatible changes
-   * to ORC files, which prevent the old reader from reading the new format,
-   * you should change these variable to reflect the next Hive release number.
-   * Non-forward compatible changes should never be added in patch releases.
-   *
-   * Do not make any changes that break backwards compatibility, which would
-   * prevent the new reader from reading ORC files generated by any released
-   * version of Hive.
-   */
-  public enum Version {
-    V_0_11("0.11", 0, 11),
-      V_0_12("0.12", 0, 12);
-
-    public static final Version CURRENT = V_0_12;
-
-    private final String name;
-    private final int major;
-    private final int minor;
-
-    Version(String name, int major, int minor) {
-      this.name = name;
-      this.major = major;
-      this.minor = minor;
-    }
-
-    public static Version byName(String name) {
-      for(Version version: values()) {
-        if (version.name.equals(name)) {
-          return version;
-        }
-      }
-      throw new IllegalArgumentException("Unknown ORC version " + name);
-    }
-
-    /**
-     * Get the human readable name for the version.
-     */
-    public String getName() {
-      return name;
-    }
-
-    /**
-     * Get the major version number.
-     */
-    public int getMajor() {
-      return major;
-    }
-
-    /**
-     * Get the minor version number.
-     */
-    public int getMinor() {
-      return minor;
-    }
-  }
-
-  /**
-   * Records the version of the writer in terms of which bugs have been fixed.
-   * For bugs in the writer, but the old readers already read the new data
-   * correctly, bump this version instead of the Version.
-   */
-  public enum WriterVersion {
-    ORIGINAL(0),
-      HIVE_8732(1), // corrupted stripe/file maximum column statistics
-      HIVE_4243(2), // use real column names from Hive tables
-// Don't use any magic numbers here except for the below:
-      FUTURE(Integer.MAX_VALUE); // a version from a future writer
-
-    private final int id;
-
-    public int getId() {
-      return id;
-    }
-
-    WriterVersion(int id) {
-      this.id = id;
-    }
-
-    private static final WriterVersion[] values;
-    static {
-      // Assumes few non-negative values close to zero.
-      int max = Integer.MIN_VALUE;
-      for (WriterVersion v : WriterVersion.values()) {
-        if (v.id < 0) throw new AssertionError();
-        if (v.id > max && FUTURE.id != v.id) {
-          max = v.id;
-        }
-      }
-      values = new WriterVersion[max + 1];
-      for (WriterVersion v : WriterVersion.values()) {
-        if (v.id < values.length) {
-          values[v.id] = v;
-        }
-      }
-    }
-
-    public static WriterVersion from(int val) {
-      if (val == FUTURE.id) return FUTURE; // Special handling for the magic value.
-      return values[val];
-    }
-  }
-
-  public enum EncodingStrategy {
-    SPEED, COMPRESSION
-  }
-
-  public enum CompressionStrategy {
-    SPEED, COMPRESSION
-  }
+public final class OrcFile extends org.apache.orc.OrcFile {
 
   // unused
-  private OrcFile() {}
+  protected OrcFile() {}
 
   /**
    * Create an ORC file reader.
@@ -162,62 +45,32 @@ public final class OrcFile {
    * @return a new ORC file reader.
    * @throws IOException
    */
-  public static Reader createReader(FileSystem fs, Path path
-  ) throws IOException {
+  public static Reader createReader(FileSystem fs,
+                                    Path path) throws IOException {
     ReaderOptions opts = new ReaderOptions(new Configuration());
     opts.filesystem(fs);
     return new ReaderImpl(path, opts);
   }
 
-  public static class ReaderOptions {
-    private final Configuration conf;
-    private FileSystem filesystem;
-    private FileMetaInfo fileMetaInfo; // TODO: this comes from some place.
-    private long maxLength = Long.MAX_VALUE;
-    private FileMetadata fullFileMetadata; // Propagate from LLAP cache.
-
+  public static class ReaderOptions extends org.apache.orc.OrcFile.ReaderOptions {
     public ReaderOptions(Configuration conf) {
-      this.conf = conf;
-    }
-    ReaderOptions fileMetaInfo(FileMetaInfo info) {
-      fileMetaInfo = info;
-      return this;
+      super(conf);
     }
 
     public ReaderOptions filesystem(FileSystem fs) {
-      this.filesystem = fs;
+      super.filesystem(fs);
       return this;
     }
 
     public ReaderOptions maxLength(long val) {
-      maxLength = val;
+      super.maxLength(val);
       return this;
     }
 
     public ReaderOptions fileMetadata(FileMetadata metadata) {
-      this.fullFileMetadata = metadata;
+      super.fileMetadata(metadata);
       return this;
     }
-
-    Configuration getConfiguration() {
-      return conf;
-    }
-
-    FileSystem getFilesystem() {
-      return filesystem;
-    }
-
-    FileMetaInfo getFileMetaInfo() {
-      return fileMetaInfo;
-    }
-
-    long getMaxLength() {
-      return maxLength;
-    }
-
-    FileMetadata getFileMetadata() {
-      return fullFileMetadata;
-    }
   }
 
   public static ReaderOptions readerOptions(Configuration conf) {
@@ -229,71 +82,39 @@ public final class OrcFile {
     return new ReaderImpl(path, options);
   }
 
-  public interface WriterContext {
-    Writer getWriter();
-  }
-
-  public interface WriterCallback {
-    void preStripeWrite(WriterContext context) throws IOException;
-    void preFooterWrite(WriterContext context) throws IOException;
-  }
-
   /**
    * Options for creating ORC file writers.
    */
-  public static class WriterOptions {
-    private final Configuration configuration;
-    private FileSystem fileSystemValue = null;
+  public static class WriterOptions extends org.apache.orc.OrcFile.WriterOptions {
     private boolean explicitSchema = false;
-    private TypeDescription schema = null;
     private ObjectInspector inspector = null;
-    private long stripeSizeValue;
-    private long blockSizeValue;
-    private int rowIndexStrideValue;
-    private int bufferSizeValue;
-    private boolean blockPaddingValue;
-    private CompressionKind compressValue;
-    private MemoryManager memoryManagerValue;
-    private Version versionValue;
-    private WriterCallback callback;
-    private EncodingStrategy encodingStrategy;
-    private CompressionStrategy compressionStrategy;
-    private double paddingTolerance;
-    private String bloomFilterColumns;
-    private double bloomFilterFpp;
 
     WriterOptions(Properties tableProperties, Configuration conf) {
-      configuration = conf;
-      memoryManagerValue = getMemoryManager(conf);
-      stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(tableProperties, conf);
-      blockSizeValue = OrcConf.BLOCK_SIZE.getLong(tableProperties, conf);
-      rowIndexStrideValue =
-          (int) OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf);
-      bufferSizeValue = (int) OrcConf.BUFFER_SIZE.getLong(tableProperties,
-          conf);
-      blockPaddingValue =
-          OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf);
-      compressValue =
-          CompressionKind.valueOf(OrcConf.COMPRESS.getString(tableProperties,
-              conf));
-      String versionName = OrcConf.WRITE_FORMAT.getString(tableProperties,
-          conf);
-      versionValue = Version.byName(versionName);
-      String enString = OrcConf.ENCODING_STRATEGY.getString(tableProperties,
-          conf);
-      encodingStrategy = EncodingStrategy.valueOf(enString);
-
-      String compString =
-          OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
-      compressionStrategy = CompressionStrategy.valueOf(compString);
-
-      paddingTolerance =
-          OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
-
-      bloomFilterColumns = OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties,
-          conf);
-      bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties,
-          conf);
+      super(tableProperties, conf);
+    }
+
+   /**
+     * A required option that sets the object inspector for the rows. If
+     * setSchema is not called, it also defines the schema.
+     */
+    public WriterOptions inspector(ObjectInspector value) {
+      this.inspector = value;
+      if (!explicitSchema) {
+        super.setSchema(OrcInputFormat.convertTypeInfo(
+            TypeInfoUtils.getTypeInfoFromObjectInspector(value)));
+      }
+      return this;
+    }
+
+    /**
+     * Set the schema for the file. This is a required parameter.
+     * @param schema the schema for the file.
+     * @return this
+     */
+    public WriterOptions setSchema(TypeDescription schema) {
+      this.explicitSchema = true;
+      super.setSchema(schema);
+      return this;
     }
 
     /**
@@ -301,7 +122,7 @@ public final class OrcFile {
      * If it is not provided, it will be found from the path.
      */
     public WriterOptions fileSystem(FileSystem value) {
-      fileSystemValue = value;
+      super.fileSystem(value);
       return this;
     }
 
@@ -311,7 +132,7 @@ public final class OrcFile {
      * is flushed to the HDFS file and the next stripe started.
      */
     public WriterOptions stripeSize(long value) {
-      stripeSizeValue = value;
+      super.stripeSize(value);
       return this;
     }
 
@@ -320,7 +141,7 @@ public final class OrcFile {
      * set the block size to be multiple factors of stripe size.
      */
     public WriterOptions blockSize(long value) {
-      blockSizeValue = value;
+      super.blockSize(value);
       return this;
     }
 
@@ -330,7 +151,7 @@ public final class OrcFile {
      * set to 0, no indexes will be included in the file.
      */
     public WriterOptions rowIndexStride(int value) {
-      rowIndexStrideValue = value;
+      super.rowIndexStride(value);
       return this;
     }
 
@@ -339,7 +160,7 @@ public final class OrcFile {
      * stripe in memory.
      */
     public WriterOptions bufferSize(int value) {
-      bufferSizeValue = value;
+      super.bufferSize(value);
       return this;
     }
 
@@ -349,7 +170,7 @@ public final class OrcFile {
      * reading, but costs space.
      */
     public WriterOptions blockPadding(boolean value) {
-      blockPaddingValue = value;
+      super.blockPadding(value);
       return this;
     }
 
@@ -357,7 +178,7 @@ public final class OrcFile {
      * Sets the encoding strategy that is used to encode the data.
      */
     public WriterOptions encodingStrategy(EncodingStrategy strategy) {
-      encodingStrategy = strategy;
+      super.encodingStrategy(strategy);
       return this;
     }
 
@@ -365,7 +186,7 @@ public final class OrcFile {
      * Sets the tolerance for block padding as a percentage of stripe size.
      */
     public WriterOptions paddingTolerance(double value) {
-      paddingTolerance = value;
+      super.paddingTolerance(value);
       return this;
     }
 
@@ -373,7 +194,7 @@ public final class OrcFile {
      * Comma separated values of column names for which bloom filter is to be created.
      */
     public WriterOptions bloomFilterColumns(String columns) {
-      bloomFilterColumns = columns;
+      super.bloomFilterColumns(columns);
       return this;
     }
 
@@ -383,7 +204,7 @@ public final class OrcFile {
      * @return this
      */
     public WriterOptions bloomFilterFpp(double fpp) {
-      bloomFilterFpp = fpp;
+      super.bloomFilterFpp(fpp);
       return this;
     }
 
@@ -391,31 +212,15 @@ public final class OrcFile {
      * Sets the generic compression that is used to compress the data.
      */
     public WriterOptions compress(CompressionKind value) {
-      compressValue = value;
+      super.compress(value.getUnderlying());
       return this;
     }
 
     /**
-     * A required option that sets the object inspector for the rows. If
-     * setSchema is not called, it also defines the schema.
-     */
-    public WriterOptions inspector(ObjectInspector value) {
-      this.inspector = value;
-      if (!explicitSchema) {
-        schema = OrcUtils.convertTypeInfo(
-            TypeInfoUtils.getTypeInfoFromObjectInspector(value));
-      }
-      return this;
-    }
-
-    /**
-     * Set the schema for the file. This is a required parameter.
-     * @param schema the schema for the file.
-     * @return this
+     * Sets the generic compression that is used to compress the data.
      */
-    public WriterOptions setSchema(TypeDescription schema) {
-      this.explicitSchema = true;
-      this.schema = schema;
+    public WriterOptions compress(org.apache.orc.CompressionKind value) {
+      super.compress(value);
       return this;
     }
 
@@ -423,7 +228,7 @@ public final class OrcFile {
      * Sets the version of the file that will be written.
      */
     public WriterOptions version(Version value) {
-      versionValue = value;
+      super.version(value);
       return this;
     }
 
@@ -433,18 +238,17 @@ public final class OrcFile {
      * @return this
      */
     public WriterOptions callback(WriterCallback callback) {
-      this.callback = callback;
+      super.callback(callback);
       return this;
     }
 
     /**
      * A package local option to set the memory manager.
      */
-    WriterOptions memory(MemoryManager value) {
-      memoryManagerValue = value;
+    protected WriterOptions memory(MemoryManager value) {
+      super.memory(value);
       return this;
     }
-
   }
 
   /**
@@ -479,18 +283,19 @@ public final class OrcFile {
   public static Writer createWriter(Path path,
                                     WriterOptions opts
                                     ) throws IOException {
-    FileSystem fs = opts.fileSystemValue == null ?
-      path.getFileSystem(opts.configuration) : opts.fileSystemValue;
-
-    return new WriterImpl(fs, path, opts.configuration, opts.inspector,
-                          opts.schema,
-                          opts.stripeSizeValue, opts.compressValue,
-                          opts.bufferSizeValue, opts.rowIndexStrideValue,
-                          opts.memoryManagerValue, opts.blockPaddingValue,
-                          opts.versionValue, opts.callback,
-                          opts.encodingStrategy, opts.compressionStrategy,
-                          opts.paddingTolerance, opts.blockSizeValue,
-                          opts.bloomFilterColumns, opts.bloomFilterFpp);
+    FileSystem fs = opts.getFileSystem() == null ?
+      path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
+
+    return new WriterImpl(fs, path, opts.getConfiguration(), opts.inspector,
+                          opts.getSchema(),
+                          opts.getStripeSize(), opts.getCompress(),
+                          opts.getBufferSize(), opts.getRowIndexStride(),
+                          opts.getMemoryManager(), opts.getBlockPadding(),
+                          opts.getVersion(), opts.getCallback(),
+                          opts.getEncodingStrategy(),
+                          opts.getCompressionStrategy(),
+                          opts.getPaddingTolerance(), opts.getBlockSize(),
+                          opts.getBloomFilterColumns(), opts.getBloomFilterFpp());
   }
 
   /**
@@ -515,29 +320,12 @@ public final class OrcFile {
                                     CompressionKind compress,
                                     int bufferSize,
                                     int rowIndexStride) throws IOException {
-    return createWriter(path,
-                        writerOptions(conf)
-                        .fileSystem(fs)
-                        .inspector(inspector)
-                        .stripeSize(stripeSize)
-                        .compress(compress)
-                        .bufferSize(bufferSize)
-                        .rowIndexStride(rowIndexStride));
-  }
-
-  private static ThreadLocal<MemoryManager> memoryManager = null;
-
-  private static synchronized MemoryManager getMemoryManager(
-      final Configuration conf) {
-    if (memoryManager == null) {
-      memoryManager = new ThreadLocal<MemoryManager>() {
-        @Override
-        protected MemoryManager initialValue() {
-          return new MemoryManager(conf);
-        }
-      };
-    }
-    return memoryManager.get();
+    return createWriter(path, writerOptions(conf)
+        .inspector(inspector)
+        .fileSystem(fs)
+        .stripeSize(stripeSize)
+        .compress(compress)
+        .bufferSize(bufferSize)
+        .rowIndexStride(rowIndexStride));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
index 8eda37f..40f1da0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
@@ -25,6 +25,8 @@ import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto;
 
 /**
  * Key for OrcFileMergeMapper task. Contains orc file related information that

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
index 41a97a3..f06195f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
 
 public class OrcFileStripeMergeRecordReader implements
     RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> {
@@ -98,7 +100,7 @@ public class OrcFileStripeMergeRecordReader implements
           valueWrapper.setUserMetadata(((ReaderImpl) reader).getOrcProtoUserMetadata());
         }
         keyWrapper.setInputPath(path);
-        keyWrapper.setCompression(reader.getCompression());
+        keyWrapper.setCompression(reader.getCompressionKind());
         keyWrapper.setCompressBufferSize(reader.getCompressionSize());
         keyWrapper.setVersion(reader.getFileVersion());
         keyWrapper.setRowIndexStride(reader.getRowIndexStride());

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
index 7389ef5..846c874 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
 
 /**
  * Value for OrcFileMergeMapper. Contains stripe related information for the

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0567cbe..b3b48d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -39,6 +39,23 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.FileMetaInfo;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -63,8 +80,6 @@ import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -86,6 +101,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.orc.OrcProto;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
@@ -234,7 +250,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+    TypeDescription schema = getDesiredRowTypeDescr(conf);
 
     Reader.Options options = new Reader.Options().range(offset, length);
     options.schema(schema);
@@ -242,7 +258,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     List<OrcProto.Type> types = file.getTypes();
     options.include(genIncludedColumns(types, conf, isOriginal));
     setSearchArgument(options, types, conf, isOriginal);
-    return file.rowsOptions(options);
+    return (RecordReader) file.rowsOptions(options);
   }
 
   public static boolean isOriginal(Reader file) {
@@ -1500,7 +1516,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+    TypeDescription schema = getDesiredRowTypeDescr(conf);
 
     final Reader reader;
     final int bucket;
@@ -1508,7 +1524,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     readOptions.range(split.getStart(), split.getLength());
 
     // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
-    final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+    final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
     readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
     setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
 
@@ -1594,7 +1610,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   }
 
   public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg,
-      WriterVersion writerVersion, List<OrcProto.Type> types,
+      OrcFile.WriterVersion writerVersion, List<OrcProto.Type> types,
       List<StripeStatistics> stripeStats, int stripeCount) {
     LOG.info("Translated ORC pushdown predicate: " + sarg);
     assert sarg != null;
@@ -1608,7 +1624,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   }
 
   private static boolean[] pickStripes(SearchArgument sarg, String[] sargColNames,
-      WriterVersion writerVersion, boolean isOriginal, List<StripeStatistics> stripeStats,
+      OrcFile.WriterVersion writerVersion, boolean isOriginal, List<StripeStatistics> stripeStats,
       int stripeCount, Path filePath) {
     if (sarg == null || stripeStats == null || writerVersion == OrcFile.WriterVersion.ORIGINAL) {
       return null; // only do split pruning if HIVE-8732 has been fixed in the writer
@@ -1927,4 +1943,165 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       return true;
     }
   }
+  /**
+   * Convert a Hive type property string that contains separated type names into a list of
+   * TypeDescription objects.
+   * @return the list of TypeDescription objects.
+   */
+  public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+      String hiveTypeProperty) {
+
+    // CONSDIER: We need a type name parser for TypeDescription.
+
+    ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty);
+    ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
+    for (TypeInfo typeInfo : typeInfoList) {
+      typeDescrList.add(convertTypeInfo(typeInfo));
+    }
+    return typeDescrList;
+  }
+
+  public static TypeDescription convertTypeInfo(TypeInfo info) {
+    switch (info.getCategory()) {
+      case PRIMITIVE: {
+        PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+        switch (pinfo.getPrimitiveCategory()) {
+          case BOOLEAN:
+            return TypeDescription.createBoolean();
+          case BYTE:
+            return TypeDescription.createByte();
+          case SHORT:
+            return TypeDescription.createShort();
+          case INT:
+            return TypeDescription.createInt();
+          case LONG:
+            return TypeDescription.createLong();
+          case FLOAT:
+            return TypeDescription.createFloat();
+          case DOUBLE:
+            return TypeDescription.createDouble();
+          case STRING:
+            return TypeDescription.createString();
+          case DATE:
+            return TypeDescription.createDate();
+          case TIMESTAMP:
+            return TypeDescription.createTimestamp();
+          case BINARY:
+            return TypeDescription.createBinary();
+          case DECIMAL: {
+            DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+            return TypeDescription.createDecimal()
+                .withScale(dinfo.getScale())
+                .withPrecision(dinfo.getPrecision());
+          }
+          case VARCHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createVarchar()
+                .withMaxLength(cinfo.getLength());
+          }
+          case CHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createChar()
+                .withMaxLength(cinfo.getLength());
+          }
+          default:
+            throw new IllegalArgumentException("ORC doesn't handle primitive" +
+                " category " + pinfo.getPrimitiveCategory());
+        }
+      }
+      case LIST: {
+        ListTypeInfo linfo = (ListTypeInfo) info;
+        return TypeDescription.createList
+            (convertTypeInfo(linfo.getListElementTypeInfo()));
+      }
+      case MAP: {
+        MapTypeInfo minfo = (MapTypeInfo) info;
+        return TypeDescription.createMap
+            (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+                convertTypeInfo(minfo.getMapValueTypeInfo()));
+      }
+      case UNION: {
+        UnionTypeInfo minfo = (UnionTypeInfo) info;
+        TypeDescription result = TypeDescription.createUnion();
+        for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+          result.addUnionChild(convertTypeInfo(child));
+        }
+        return result;
+      }
+      case STRUCT: {
+        StructTypeInfo sinfo = (StructTypeInfo) info;
+        TypeDescription result = TypeDescription.createStruct();
+        for(String fieldName: sinfo.getAllStructFieldNames()) {
+          result.addField(fieldName,
+              convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+        }
+        return result;
+      }
+      default:
+        throw new IllegalArgumentException("ORC doesn't handle " +
+            info.getCategory());
+    }
+  }
+
+
+  public static TypeDescription getDesiredRowTypeDescr(Configuration conf) {
+
+    String columnNameProperty = null;
+    String columnTypeProperty = null;
+
+    ArrayList<String> schemaEvolutionColumnNames = null;
+    ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null;
+
+    boolean haveSchemaEvolutionProperties = false;
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+
+      columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+      columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+
+      haveSchemaEvolutionProperties =
+          (columnNameProperty != null && columnTypeProperty != null);
+
+      if (haveSchemaEvolutionProperties) {
+        schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+        if (schemaEvolutionColumnNames.size() == 0) {
+          haveSchemaEvolutionProperties = false;
+        } else {
+          schemaEvolutionTypeDescrs =
+              typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+          if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+            haveSchemaEvolutionProperties = false;
+          }
+        }
+      }
+    }
+
+    if (!haveSchemaEvolutionProperties) {
+
+      // Try regular properties;
+      columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+      columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+      if (columnTypeProperty == null || columnNameProperty == null) {
+        return null;
+      }
+
+      schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+      if (schemaEvolutionColumnNames.size() == 0) {
+        return null;
+      }
+      schemaEvolutionTypeDescrs =
+          typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+      if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+        return null;
+      }
+    }
+
+    // Desired schema does not include virtual columns or partition columns.
+    TypeDescription result = TypeDescription.createStruct();
+    for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+      result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
+    }
+
+    return result;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
index c15b35f..2782d7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.OrcProto;
 
 /** An InputFormat for ORC files. Keys are meaningless,
  * value is the OrcStruct object */

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
index edc7490..2e63aba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.FileMetaInfo;
 
 /**
  * OrcFileSplit. Holds file meta info

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index 2d0eaaf..3fb6a86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.orc.CompressionKind;
+import org.apache.orc.TypeDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,7 +35,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -160,7 +161,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
         TypeDescription schema = TypeDescription.createStruct();
         for (int i = 0; i < columnNames.size(); ++i) {
           schema.addField(columnNames.get(i),
-              OrcUtils.convertTypeInfo(columnTypes.get(i)));
+              OrcInputFormat.convertTypeInfo(columnTypes.get(i)));
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("ORC schema = " + schema);

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 090d63b..e5f9786 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -26,6 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -442,7 +447,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.length = options.getLength();
     this.validTxnList = validTxnList;
 
-    TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf);
+    TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf);
     if (typeDescr == null) {
       throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
index c0e9b1a..59876e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Properties;
 
+import org.apache.orc.OrcConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 81afb48..61cde41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -25,12 +25,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.orc.FileMetaInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileSplit;

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
index 7a17b92..d48cadd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcProto;
 
 import java.io.DataInput;
 import java.io.DataOutput;