You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/03/23 02:41:50 UTC

[5/7] tajo git commit: TAJO-2102: Migrate to Apache Orc from Presto's one.

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
deleted file mode 100644
index 27cdac2..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-
-/**
- * Statistics for decimal columns.
- */
-public interface DecimalColumnStatistics extends ColumnStatistics {
-
-  /**
-   * Get the minimum value for the column.
-   * @return the minimum value
-   */
-  HiveDecimal getMinimum();
-
-  /**
-   * Get the maximum value for the column.
-   * @return the maximum value
-   */
-  HiveDecimal getMaximum();
-
-  /**
-   * Get the sum of the values of the column.
-   * @return the sum
-   */
-  HiveDecimal getSum();
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
deleted file mode 100644
index 5333052..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface DirectDecompressionCodec extends CompressionCodec {
-  public boolean isAvailable();
-  public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
deleted file mode 100644
index ddce8f7..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * Statistics for float and double columns.
- */
-public interface DoubleColumnStatistics extends ColumnStatistics {
-
-  /**
-   * Get the smallest value in the column. Only defined if getNumberOfValues
-   * is non-zero.
-   * @return the minimum
-   */
-  double getMinimum();
-
-  /**
-   * Get the largest value in the column. Only defined if getNumberOfValues
-   * is non-zero.
-   * @return the maximum
-   */
-  double getMaximum();
-
-  /**
-   * Get the sum of the values in the column.
-   * @return the sum
-   */
-  double getSum();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
deleted file mode 100644
index 1d44f77..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import org.apache.hadoop.io.Text;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A class that is a growable array of bytes. Growth is managed in terms of
- * chunks that are allocated when needed.
- */
-final class DynamicByteArray {
-  static final int DEFAULT_CHUNKSIZE = 32 * 1024;
-  static final int DEFAULT_NUM_CHUNKS = 128;
-
-  private final int chunkSize;        // our allocation sizes
-  private byte[][] data;              // the real data
-  private int length;                 // max set element index +1
-  private int initializedChunks = 0;  // the number of chunks created
-
-  public DynamicByteArray() {
-    this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE);
-  }
-
-  public DynamicByteArray(int numChunks, int chunkSize) {
-    if (chunkSize == 0) {
-      throw new IllegalArgumentException("bad chunksize");
-    }
-    this.chunkSize = chunkSize;
-    data = new byte[numChunks][];
-  }
-
-  /**
-   * Ensure that the given index is valid.
-   */
-  private void grow(int chunkIndex) {
-    if (chunkIndex >= initializedChunks) {
-      if (chunkIndex >= data.length) {
-        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
-        byte[][] newChunk = new byte[newSize][];
-        System.arraycopy(data, 0, newChunk, 0, data.length);
-        data = newChunk;
-      }
-      for(int i=initializedChunks; i <= chunkIndex; ++i) {
-        data[i] = new byte[chunkSize];
-      }
-      initializedChunks = chunkIndex + 1;
-    }
-  }
-
-  public byte get(int index) {
-    if (index >= length) {
-      throw new IndexOutOfBoundsException("Index " + index +
-                                            " is outside of 0.." +
-                                            (length - 1));
-    }
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    return data[i][j];
-  }
-
-  public void set(int index, byte value) {
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    grow(i);
-    if (index >= length) {
-      length = index + 1;
-    }
-    data[i][j] = value;
-  }
-
-  public int add(byte value) {
-    int i = length / chunkSize;
-    int j = length % chunkSize;
-    grow(i);
-    data[i][j] = value;
-    int result = length;
-    length += 1;
-    return result;
-  }
-
-  /**
-   * Copy a slice of a byte array into our buffer.
-   * @param value the array to copy from
-   * @param valueOffset the first location to copy from value
-   * @param valueLength the number of bytes to copy from value
-   * @return the offset of the start of the value
-   */
-  public int add(byte[] value, int valueOffset, int valueLength) {
-    int i = length / chunkSize;
-    int j = length % chunkSize;
-    grow((length + valueLength) / chunkSize);
-    int remaining = valueLength;
-    while (remaining > 0) {
-      int size = Math.min(remaining, chunkSize - j);
-      System.arraycopy(value, valueOffset, data[i], j, size);
-      remaining -= size;
-      valueOffset += size;
-      i += 1;
-      j = 0;
-    }
-    int result = length;
-    length += valueLength;
-    return result;
-  }
-
-  /**
-   * Read the entire stream into this array.
-   * @param in the stream to read from
-   * @throws IOException
-   */
-  public void readAll(InputStream in) throws IOException {
-    int currentChunk = length / chunkSize;
-    int currentOffset = length % chunkSize;
-    grow(currentChunk);
-    int currentLength = in.read(data[currentChunk], currentOffset,
-      chunkSize - currentOffset);
-    while (currentLength > 0) {
-      length += currentLength;
-      currentOffset = length % chunkSize;
-      if (currentOffset == 0) {
-        currentChunk = length / chunkSize;
-        grow(currentChunk);
-      }
-      currentLength = in.read(data[currentChunk], currentOffset,
-        chunkSize - currentOffset);
-    }
-  }
-
-  /**
-   * Byte compare a set of bytes against the bytes in this dynamic array.
-   * @param other source of the other bytes
-   * @param otherOffset start offset in the other array
-   * @param otherLength number of bytes in the other array
-   * @param ourOffset the offset in our array
-   * @param ourLength the number of bytes in our array
-   * @return negative for less, 0 for equal, positive for greater
-   */
-  public int compare(byte[] other, int otherOffset, int otherLength,
-                     int ourOffset, int ourLength) {
-    int currentChunk = ourOffset / chunkSize;
-    int currentOffset = ourOffset % chunkSize;
-    int maxLength = Math.min(otherLength, ourLength);
-    while (maxLength > 0 &&
-      other[otherOffset] == data[currentChunk][currentOffset]) {
-      otherOffset += 1;
-      currentOffset += 1;
-      if (currentOffset == chunkSize) {
-        currentChunk += 1;
-        currentOffset = 0;
-      }
-      maxLength -= 1;
-    }
-    if (maxLength == 0) {
-      return otherLength - ourLength;
-    }
-    int otherByte = 0xff & other[otherOffset];
-    int ourByte = 0xff & data[currentChunk][currentOffset];
-    return otherByte > ourByte ? 1 : -1;
-  }
-
-  /**
-   * Get the size of the array.
-   * @return the number of bytes in the array
-   */
-  public int size() {
-    return length;
-  }
-
-  /**
-   * Clear the array to its original pristine state.
-   */
-  public void clear() {
-    length = 0;
-    for(int i=0; i < data.length; ++i) {
-      data[i] = null;
-    }
-    initializedChunks = 0;
-  }
-
-  /**
-   * Set a text value from the bytes in this dynamic array.
-   * @param result the value to set
-   * @param offset the start of the bytes to copy
-   * @param length the number of bytes to copy
-   */
-  public void setText(Text result, int offset, int length) {
-    result.clear();
-    int currentChunk = offset / chunkSize;
-    int currentOffset = offset % chunkSize;
-    int currentLength = Math.min(length, chunkSize - currentOffset);
-    while (length > 0) {
-      result.append(data[currentChunk], currentOffset, currentLength);
-      length -= currentLength;
-      currentChunk += 1;
-      currentOffset = 0;
-      currentLength = Math.min(length, chunkSize - currentOffset);
-    }
-  }
-
-  /**
-   * Write out a range of this dynamic array to an output stream.
-   * @param out the stream to write to
-   * @param offset the first offset to write
-   * @param length the number of bytes to write
-   * @throws IOException
-   */
-  public void write(OutputStream out, int offset,
-                    int length) throws IOException {
-    int currentChunk = offset / chunkSize;
-    int currentOffset = offset % chunkSize;
-    while (length > 0) {
-      int currentLength = Math.min(length, chunkSize - currentOffset);
-      out.write(data[currentChunk], currentOffset, currentLength);
-      length -= currentLength;
-      currentChunk += 1;
-      currentOffset = 0;
-    }
-  }
-
-  @Override
-  public String toString() {
-    int i;
-    StringBuilder sb = new StringBuilder(length * 3);
-
-    sb.append('{');
-    int l = length - 1;
-    for (i=0; i<l; i++) {
-      sb.append(Integer.toHexString(get(i)));
-      sb.append(',');
-    }
-    sb.append(get(i));
-    sb.append('}');
-
-    return sb.toString();
-  }
-
-  public void setByteBuffer(ByteBuffer result, int offset, int length) {
-    result.clear();
-    int currentChunk = offset / chunkSize;
-    int currentOffset = offset % chunkSize;
-    int currentLength = Math.min(length, chunkSize - currentOffset);
-    while (length > 0) {
-      result.put(data[currentChunk], currentOffset, currentLength);
-      length -= currentLength;
-      currentChunk += 1;
-      currentOffset = 0;
-      currentLength = Math.min(length, chunkSize - currentOffset);
-    }
-  }
-
-  /**
-   * Gets all the bytes of the array.
-   *
-   * @return Bytes of the array
-   */
-  public byte[] get() {
-    byte[] result = null;
-    if (length > 0) {
-      int currentChunk = 0;
-      int currentOffset = 0;
-      int currentLength = Math.min(length, chunkSize);
-      int destOffset = 0;
-      result = new byte[length];
-      int totalLength = length;
-      while (totalLength > 0) {
-        System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
-        destOffset += currentLength;
-        totalLength -= currentLength;
-        currentChunk += 1;
-        currentOffset = 0;
-        currentLength = Math.min(totalLength, chunkSize - currentOffset);
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Get the size of the buffers.
-   */
-  public long getSizeInBytes() {
-    return initializedChunks * chunkSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
deleted file mode 100644
index a347706..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * Dynamic int array that uses primitive types and chunks to avoid copying
- * large number of integers when it resizes.
- *
- * The motivation for this class is memory optimization, i.e. space efficient
- * storage of potentially huge arrays without good a-priori size guesses.
- *
- * The API of this class is between a primitive array and a AbstractList. It's
- * not a Collection implementation because it handles primitive types, but the
- * API could be extended to support iterators and the like.
- *
- * NOTE: Like standard Collection implementations/arrays, this class is not
- * synchronized.
- */
-final class DynamicIntArray {
-  static final int DEFAULT_CHUNKSIZE = 8 * 1024;
-  static final int INIT_CHUNKS = 128;
-
-  private final int chunkSize;       // our allocation size
-  private int[][] data;              // the real data
-  private int length;                // max set element index +1
-  private int initializedChunks = 0; // the number of created chunks
-
-  public DynamicIntArray() {
-    this(DEFAULT_CHUNKSIZE);
-  }
-
-  public DynamicIntArray(int chunkSize) {
-    this.chunkSize = chunkSize;
-
-    data = new int[INIT_CHUNKS][];
-  }
-
-  /**
-   * Ensure that the given index is valid.
-   */
-  private void grow(int chunkIndex) {
-    if (chunkIndex >= initializedChunks) {
-      if (chunkIndex >= data.length) {
-        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
-        int[][] newChunk = new int[newSize][];
-        System.arraycopy(data, 0, newChunk, 0, data.length);
-        data = newChunk;
-      }
-      for (int i=initializedChunks; i <= chunkIndex; ++i) {
-        data[i] = new int[chunkSize];
-      }
-      initializedChunks = chunkIndex + 1;
-    }
-  }
-
-  public int get(int index) {
-    if (index >= length) {
-      throw new IndexOutOfBoundsException("Index " + index +
-                                            " is outside of 0.." +
-                                            (length - 1));
-    }
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    return data[i][j];
-  }
-
-  public void set(int index, int value) {
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    grow(i);
-    if (index >= length) {
-      length = index + 1;
-    }
-    data[i][j] = value;
-  }
-
-  public void increment(int index, int value) {
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    grow(i);
-    if (index >= length) {
-      length = index + 1;
-    }
-    data[i][j] += value;
-  }
-
-  public void add(int value) {
-    int i = length / chunkSize;
-    int j = length % chunkSize;
-    grow(i);
-    data[i][j] = value;
-    length += 1;
-  }
-
-  public int size() {
-    return length;
-  }
-
-  public void clear() {
-    length = 0;
-    for(int i=0; i < data.length; ++i) {
-      data[i] = null;
-    }
-    initializedChunks = 0;
-  }
-
-  public String toString() {
-    int i;
-    StringBuilder sb = new StringBuilder(length * 4);
-
-    sb.append('{');
-    int l = length - 1;
-    for (i=0; i<l; i++) {
-      sb.append(get(i));
-      sb.append(',');
-    }
-    sb.append(get(i));
-    sb.append('}');
-
-    return sb.toString();
-  }
-
-  public int getSizeInBytes() {
-    return 4 * initializedChunks * chunkSize;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
deleted file mode 100644
index 5357f51..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
+++ /dev/null
@@ -1,133 +0,0 @@
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import com.facebook.presto.orc.DiskRange;
-import com.facebook.presto.orc.OrcDataSource;
-import com.google.common.collect.ImmutableMap;
-import io.airlift.slice.BasicSliceInput;
-import io.airlift.slice.FixedLengthSliceInput;
-import io.airlift.units.DataSize;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
-import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * HDFS File data source class for Orc Reader
- *
- * Most of code is from Presto
- */
-public class HdfsOrcDataSource
-  implements OrcDataSource
-{
-  private final FSDataInputStream inputStream;
-  private final String path;
-  private final long size;
-  private final DataSize maxMergeDistance;
-  private final DataSize maxReadSize;
-  private long readTimeNanos;
-
-  public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size,
-                           DataSize maxMergeDistance, DataSize maxReadSize)
-  {
-    this.path = checkNotNull(path, "path is null");
-    this.inputStream = checkNotNull(inputStream, "inputStream is null");
-    this.size = size;
-    checkArgument(size >= 0, "size is negative");
-
-    this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null");
-    this.maxReadSize = checkNotNull(maxReadSize, "maxMergeDistance is null");
-  }
-
-  @Override
-  public void close()
-    throws IOException
-  {
-    inputStream.close();
-  }
-
-  @Override
-  public long getReadTimeNanos()
-  {
-    return readTimeNanos;
-  }
-
-  @Override
-  public long getSize()
-  {
-    return size;
-  }
-
-  @Override
-  public void readFully(long position, byte[] buffer)
-    throws IOException
-  {
-    readFully(position, buffer, 0, buffer.length);
-  }
-
-  @Override
-  public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
-    throws IOException
-  {
-    long start = System.nanoTime();
-
-    inputStream.readFully(position, buffer, bufferOffset, bufferLength);
-    readTimeNanos += System.nanoTime() - start;
-  }
-
-  @Override
-  public <K> Map<K, FixedLengthSliceInput> readFully(Map<K, DiskRange> diskRanges)
-    throws IOException
-  {
-    checkNotNull(diskRanges, "diskRanges is null");
-
-    if (diskRanges.isEmpty()) {
-      return ImmutableMap.of();
-    }
-
-    Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize);
-
-    // read ranges
-    Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
-    for (DiskRange mergedRange : mergedRanges) {
-      // read full range in one request
-      byte[] buffer = new byte[mergedRange.getLength()];
-      readFully(mergedRange.getOffset(), buffer);
-      buffers.put(mergedRange, buffer);
-    }
-
-    ImmutableMap.Builder<K, FixedLengthSliceInput> slices = ImmutableMap.builder();
-    diskRanges.forEach((K key, DiskRange range) ->
-        slices.put(key, new BasicSliceInput(getDiskRangeSlice(range, buffers))));
-
-    return slices.build();
-  }
-
-  @Override
-  public String toString()
-  {
-    return path;
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
deleted file mode 100644
index 208454f..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * Statistics for all of the integer columns, such as byte, short, int, and
- * long.
- */
-public interface IntegerColumnStatistics extends ColumnStatistics {
-  /**
-   * Get the smallest value in the column. Only defined if getNumberOfValues
-   * is non-zero.
-   * @return the minimum
-   */
-  long getMinimum();
-
-  /**
-   * Get the largest value in the column. Only defined if getNumberOfValues
-   * is non-zero.
-   * @return the maximum
-   */
-  long getMaximum();
-
-  /**
-   * Is the sum defined? If the sum overflowed the counter this will be false.
-   * @return is the sum available
-   */
-  boolean isSumDefined();
-
-  /**
-   * Get the sum of the column. Only valid if isSumDefined returns true.
-   * @return the sum of the column
-   */
-  long getSum();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
deleted file mode 100644
index 6872882..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * Interface for writing integers.
- */
-interface IntegerWriter {
-
-  /**
-   * Get position from the stream.
-   * @param recorder
-   * @throws IOException
-   */
-  void getPosition(PositionRecorder recorder) throws IOException;
-
-  /**
-   * Write the integer value
-   * @param value
-   * @throws IOException
-   */
-  void write(long value) throws IOException;
-
-  /**
-   * Flush the buffer
-   * @throws IOException
-   */
-  void flush() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
deleted file mode 100644
index 79af80f..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Implements a memory manager that keeps a global context of how many ORC
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- *
- * This class is not thread safe, but is re-entrant - ensure creation and all
- * invocations are triggered from the same thread.
- */
-class MemoryManager {
-
-  private static final Log LOG = LogFactory.getLog(MemoryManager.class);
-
-  /**
-   * How often should we check the memory sizes? Measured in rows added
-   * to all of the writers.
-   */
-  private static final int ROWS_BETWEEN_CHECKS = 5000;
-  private final long totalMemoryPool;
-  private final Map<Path, WriterInfo> writerList =
-          new HashMap<>();
-  private long totalAllocation = 0;
-  private double currentScale = 1;
-  private int rowsAddedSinceCheck = 0;
-  private final OwnedLock ownerLock = new OwnedLock();
-
-  @SuppressWarnings("serial")
-  private static class OwnedLock extends ReentrantLock {
-    public Thread getOwner() {
-      return super.getOwner();
-    }
-  }
-
-  private static class WriterInfo {
-    long allocation;
-    Callback callback;
-    WriterInfo(long allocation, Callback callback) {
-      this.allocation = allocation;
-      this.callback = callback;
-    }
-  }
-
-  public interface Callback {
-    /**
-     * The writer needs to check its memory usage
-     * @param newScale the current scale factor for memory allocations
-     * @return true if the writer was over the limit
-     * @throws IOException
-     */
-    boolean checkMemory(double newScale) throws IOException;
-  }
-
-  /**
-   * Create the memory manager.
-   * @param conf use the configuration to find the maximum size of the memory
-   *             pool.
-   */
-  MemoryManager(Configuration conf) {
-    HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
-    double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
-    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
-        getHeapMemoryUsage().getMax() * maxLoad);
-    ownerLock.lock();
-  }
-
-  /**
-   * Light weight thread-safety check for multi-threaded access patterns
-   */
-  private void checkOwner() {
-    Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
-      "Owner thread expected %s, got %s",
-      ownerLock.getOwner(),
-      Thread.currentThread());
-  }
-
-  /**
-   * Add a new writer's memory allocation to the pool. We use the path
-   * as a unique key to ensure that we don't get duplicates.
-   * @param path the file that is being written
-   * @param requestedAllocation the requested buffer size
-   */
-  void addWriter(Path path, long requestedAllocation,
-                              Callback callback) throws IOException {
-    checkOwner();
-    WriterInfo oldVal = writerList.get(path);
-    // this should always be null, but we handle the case where the memory
-    // manager wasn't told that a writer wasn't still in use and the task
-    // starts writing to the same path.
-    if (oldVal == null) {
-      oldVal = new WriterInfo(requestedAllocation, callback);
-      writerList.put(path, oldVal);
-      totalAllocation += requestedAllocation;
-    } else {
-      // handle a new writer that is writing to the same path
-      totalAllocation += requestedAllocation - oldVal.allocation;
-      oldVal.allocation = requestedAllocation;
-      oldVal.callback = callback;
-    }
-    updateScale(true);
-  }
-
-  /**
-   * Remove the given writer from the pool.
-   * @param path the file that has been closed
-   */
-  void removeWriter(Path path) throws IOException {
-    checkOwner();
-    WriterInfo val = writerList.get(path);
-    if (val != null) {
-      writerList.remove(path);
-      totalAllocation -= val.allocation;
-      if (writerList.isEmpty()) {
-        rowsAddedSinceCheck = 0;
-      }
-      updateScale(false);
-    }
-    if(writerList.isEmpty()) {
-      rowsAddedSinceCheck = 0;
-    }
-  }
-
-  /**
-   * Get the total pool size that is available for ORC writers.
-   * @return the number of bytes in the pool
-   */
-  long getTotalMemoryPool() {
-    return totalMemoryPool;
-  }
-
-  /**
-   * The scaling factor for each allocation to ensure that the pool isn't
-   * oversubscribed.
-   * @return a fraction between 0.0 and 1.0 of the requested size that is
-   * available for each writer.
-   */
-  double getAllocationScale() {
-    return currentScale;
-  }
-
-  /**
-   * Give the memory manager an opportunity for doing a memory check.
-   * @throws IOException
-   */
-  void addedRow() throws IOException {
-    if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
-      notifyWriters();
-    }
-  }
-
-  /**
-   * Notify all of the writers that they should check their memory usage.
-   * @throws IOException
-   */
-  void notifyWriters() throws IOException {
-    checkOwner();
-    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
-    for(WriterInfo writer: writerList.values()) {
-      boolean flushed = writer.callback.checkMemory(currentScale);
-      if (LOG.isDebugEnabled() && flushed) {
-        LOG.debug("flushed " + writer.toString());
-      }
-    }
-    rowsAddedSinceCheck = 0;
-  }
-
-  /**
-   * Update the currentScale based on the current allocation and pool size.
-   * This also updates the notificationTrigger.
-   * @param isAllocate is this an allocation?
-   */
-  private void updateScale(boolean isAllocate) throws IOException {
-    if (totalAllocation <= totalMemoryPool) {
-      currentScale = 1;
-    } else {
-      currentScale = (double) totalMemoryPool / totalAllocation;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
deleted file mode 100644
index dfa4c36..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-public class Metadata {
-
-  private final OrcProto.Metadata metadata;
-
-  Metadata(OrcProto.Metadata m) {
-    this.metadata = m;
-  }
-
-  /**
-   * Return list of stripe level column statistics
-   *
-   * @return list of stripe statistics
-   */
-  public List<StripeStatistics> getStripeStatistics() {
-    List<StripeStatistics> result = Lists.newArrayList();
-    for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) {
-      result.add(new StripeStatistics(ss.getColStatsList()));
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
index a291953..8f26d21 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
@@ -21,11 +21,15 @@ package org.apache.tajo.storage.thirdparty.orc;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-
-import static org.apache.tajo.storage.thirdparty.orc.OrcConf.ConfVars.*;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.FileMetaInfo;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.MemoryManager;
+import org.apache.tajo.storage.orc.ORCAppender;
 
 import java.io.IOException;
+import java.util.Properties;
 import java.util.TimeZone;
 
 /**
@@ -50,9 +54,9 @@ public final class OrcFile {
    * prevent the new reader from reading ORC files generated by any released
    * version of Hive.
    */
-  public static enum Version {
+  public enum Version {
     V_0_11("0.11", 0, 11),
-      V_0_12("0.12", 0, 12);
+    V_0_12("0.12", 0, 12);
 
     public static final Version CURRENT = V_0_12;
 
@@ -102,9 +106,14 @@ public final class OrcFile {
    * For bugs in the writer, but the old readers already read the new data
    * correctly, bump this version instead of the Version.
    */
-  public static enum WriterVersion {
+  public enum WriterVersion {
     ORIGINAL(0),
-      HIVE_8732(1); // corrupted stripe/file maximum column statistics
+    HIVE_8732(1), // corrupted stripe/file maximum column statistics
+    HIVE_4243(2), // use real column names from Hive tables
+    HIVE_12055(3), // vectorized writer
+
+    // Don't use any magic numbers here except for the below:
+    FUTURE(Integer.MAX_VALUE); // a version from a future writer
 
     private final int id;
 
@@ -112,67 +121,111 @@ public final class OrcFile {
       return id;
     }
 
-    private WriterVersion(int id) {
+    WriterVersion(int id) {
       this.id = id;
     }
+
+    private static final WriterVersion[] values;
+    static {
+      // Assumes few non-negative values close to zero.
+      int max = Integer.MIN_VALUE;
+      for (WriterVersion v : WriterVersion.values()) {
+        if (v.id < 0) throw new AssertionError();
+        if (v.id > max && FUTURE.id != v.id) {
+          max = v.id;
+        }
+      }
+      values = new WriterVersion[max + 1];
+      for (WriterVersion v : WriterVersion.values()) {
+        if (v.id < values.length) {
+          values[v.id] = v;
+        }
+      }
+    }
+
+    public static WriterVersion from(int val) {
+      if (val == FUTURE.id) return FUTURE; // Special handling for the magic value.
+      return values[val];
+    }
   }
+  public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_12055;
 
-  public static enum EncodingStrategy {
+  public enum EncodingStrategy {
     SPEED, COMPRESSION;
   }
 
-  public static enum CompressionStrategy {
+  public enum CompressionStrategy {
     SPEED, COMPRESSION;
   }
 
-  // Note : these string definitions for table properties are deprecated,
-  // and retained only for backward compatibility, please do not add to
-  // them, add to OrcTableProperties below instead
-  @Deprecated public static final String COMPRESSION = "orc.compress";
-  @Deprecated public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size";
-  @Deprecated public static final String STRIPE_SIZE = "orc.stripe.size";
-  @Deprecated public static final String ROW_INDEX_STRIDE = "orc.row.index.stride";
-  @Deprecated public static final String ENABLE_INDEXES = "orc.create.index";
-  @Deprecated public static final String BLOCK_PADDING = "orc.block.padding";
+  // unused
+  private OrcFile() {}
 
-  /**
-   * Enum container for all orc table properties.
-   * If introducing a new orc-specific table property,
-   * add it here.
-   */
-  public static enum OrcTableProperties {
-    COMPRESSION("orc.compress"),
-    COMPRESSION_BLOCK_SIZE("orc.compress.size"),
-    STRIPE_SIZE("orc.stripe.size"),
-    BLOCK_SIZE("orc.block.size"),
-    ROW_INDEX_STRIDE("orc.row.index.stride"),
-    ENABLE_INDEXES("orc.create.index"),
-    BLOCK_PADDING("orc.block.padding"),
-    ENCODING_STRATEGY("orc.encoding.strategy"),
-    BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"),
-    BLOOM_FILTER_FPP("orc.bloom.filter.fpp");
+  public static class ReaderOptions {
+    private final Configuration conf;
+    private FileSystem filesystem;
+    private FileMetaInfo fileMetaInfo; // TODO: this comes from some place.
+    private long maxLength = Long.MAX_VALUE;
+    private FileMetadata fullFileMetadata; // Propagate from LLAP cache.
+
+    public ReaderOptions(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public ReaderOptions fileMetaInfo(FileMetaInfo info) {
+      fileMetaInfo = info;
+      return this;
+    }
+
+    public ReaderOptions filesystem(FileSystem fs) {
+      this.filesystem = fs;
+      return this;
+    }
+
+    public ReaderOptions maxLength(long val) {
+      maxLength = val;
+      return this;
+    }
 
-    private final String propName;
+    public ReaderOptions fileMetadata(FileMetadata metadata) {
+      this.fullFileMetadata = metadata;
+      return this;
+    }
+
+    public Configuration getConfiguration() {
+      return conf;
+    }
 
-    OrcTableProperties(String propName) {
-      this.propName = propName;
+    public FileSystem getFilesystem() {
+      return filesystem;
     }
 
-    public String getPropName(){
-      return this.propName;
+    public FileMetaInfo getFileMetaInfo() {
+      return fileMetaInfo;
+    }
+
+    public long getMaxLength() {
+      return maxLength;
+    }
+
+    public FileMetadata getFileMetadata() {
+      return fullFileMetadata;
     }
   }
 
-  // unused
-  private OrcFile() {}
+  public static ReaderOptions readerOptions(Configuration conf) {
+    return new ReaderOptions(conf);
+  }
+
+
 
-  public static interface WriterContext {
+  public interface WriterContext {
     Writer getWriter();
   }
 
-  public static interface WriterCallback {
-    public void preStripeWrite(WriterContext context) throws IOException;
-    public void preFooterWrite(WriterContext context) throws IOException;
+  public interface WriterCallback {
+    void preStripeWrite(WriterContext context) throws IOException;
+    void preFooterWrite(WriterContext context) throws IOException;
   }
 
   /**
@@ -181,7 +234,7 @@ public final class OrcFile {
   public static class WriterOptions {
     private final Configuration configuration;
     private FileSystem fileSystemValue = null;
-    private ObjectInspector inspectorValue = null;
+    private TypeDescription schema = null;
     private long stripeSizeValue;
     private long blockSizeValue;
     private int rowIndexStrideValue;
@@ -193,45 +246,42 @@ public final class OrcFile {
     private WriterCallback callback;
     private EncodingStrategy encodingStrategy;
     private CompressionStrategy compressionStrategy;
-    private float paddingTolerance;
+    private double paddingTolerance;
     private String bloomFilterColumns;
     private double bloomFilterFpp;
-    private TimeZone timezone;
 
-    WriterOptions(Configuration conf) {
+    protected WriterOptions(Properties tableProperties, Configuration conf) {
       configuration = conf;
-      memoryManagerValue = getMemoryManager(conf);
-      stripeSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE);
-      blockSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE);
-      rowIndexStrideValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE);
-      bufferSizeValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE);
-      blockPaddingValue = OrcConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING);
-      compressValue = CompressionKind.valueOf(OrcConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS));
-      String versionName = OrcConf.getVar(conf, HIVE_ORC_WRITE_FORMAT);
-      if (versionName == null) {
-        versionValue = Version.CURRENT;
-      } else {
-        versionValue = Version.byName(versionName);
-      }
-      String enString =
-          conf.get(OrcConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname);
-      if (enString == null) {
-        encodingStrategy = EncodingStrategy.SPEED;
-      } else {
-        encodingStrategy = EncodingStrategy.valueOf(enString);
-      }
-
-      String compString = conf
-          .get(OrcConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname);
-      if (compString == null) {
-        compressionStrategy = CompressionStrategy.SPEED;
-      } else {
-        compressionStrategy = CompressionStrategy.valueOf(compString);
-      }
-
-      paddingTolerance = conf.getFloat(OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname,
-          OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal);
-      bloomFilterFpp = BloomFilterIO.DEFAULT_FPP;
+      memoryManagerValue = getStaticMemoryManager(conf);
+      stripeSizeValue = org.apache.orc.OrcConf.STRIPE_SIZE.getLong(tableProperties, conf);
+      blockSizeValue = org.apache.orc.OrcConf.BLOCK_SIZE.getLong(tableProperties, conf);
+      rowIndexStrideValue =
+          (int) org.apache.orc.OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf);
+      bufferSizeValue = (int) org.apache.orc.OrcConf.BUFFER_SIZE.getLong(tableProperties,
+          conf);
+      blockPaddingValue =
+          org.apache.orc.OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf);
+      compressValue =
+          CompressionKind.valueOf(org.apache.orc.OrcConf.COMPRESS.getString(tableProperties,
+              conf));
+      String versionName = org.apache.orc.OrcConf.WRITE_FORMAT.getString(tableProperties,
+          conf);
+      versionValue = Version.byName(versionName);
+      String enString = org.apache.orc.OrcConf.ENCODING_STRATEGY.getString(tableProperties,
+          conf);
+      encodingStrategy = EncodingStrategy.valueOf(enString);
+
+      String compString =
+          org.apache.orc.OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
+      compressionStrategy = CompressionStrategy.valueOf(compString);
+
+      paddingTolerance =
+          org.apache.orc.OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
+
+      bloomFilterColumns = org.apache.orc.OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties,
+          conf);
+      bloomFilterFpp = org.apache.orc.OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties,
+          conf);
     }
 
     /**
@@ -302,7 +352,7 @@ public final class OrcFile {
     /**
      * Sets the tolerance for block padding as a percentage of stripe size.
      */
-    public WriterOptions paddingTolerance(float value) {
+    public WriterOptions paddingTolerance(double value) {
       paddingTolerance = value;
       return this;
     }
@@ -318,7 +368,7 @@ public final class OrcFile {
     /**
      * Specify the false positive probability for bloom filter.
      * @param fpp - false positive probability
-     * @return
+     * @return this
      */
     public WriterOptions bloomFilterFpp(double fpp) {
       bloomFilterFpp = fpp;
@@ -334,11 +384,12 @@ public final class OrcFile {
     }
 
     /**
-     * A required option that sets the object inspector for the rows. Used
-     * to determine the schema for the file.
+     * Set the schema for the file. This is a required parameter.
+     * @param schema the schema for the file.
+     * @return this
      */
-    public WriterOptions inspector(ObjectInspector value) {
-      inspectorValue = value;
+    public WriterOptions setSchema(TypeDescription schema) {
+      this.schema = schema;
       return this;
     }
 
@@ -353,7 +404,7 @@ public final class OrcFile {
     /**
      * Add a listener for when the stripe and file are about to be closed.
      * @param callback the object to be called when the stripe is closed
-     * @return
+     * @return this
      */
     public WriterOptions callback(WriterCallback callback) {
       this.callback = callback;
@@ -363,25 +414,112 @@ public final class OrcFile {
     /**
      * A package local option to set the memory manager.
      */
-    WriterOptions memory(MemoryManager value) {
+    protected WriterOptions memory(MemoryManager value) {
       memoryManagerValue = value;
       return this;
     }
 
-    /**
-     * Tajo-specific
-     */
-    WriterOptions timezone(TimeZone value) {
-      timezone = value;
-      return this;
+    public boolean getBlockPadding() {
+      return blockPaddingValue;
+    }
+
+    public long getBlockSize() {
+      return blockSizeValue;
+    }
+
+    public String getBloomFilterColumns() {
+      return bloomFilterColumns;
     }
+
+    public FileSystem getFileSystem() {
+      return fileSystemValue;
+    }
+
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public TypeDescription getSchema() {
+      return schema;
+    }
+
+    public long getStripeSize() {
+      return stripeSizeValue;
+    }
+
+    public CompressionKind getCompress() {
+      return compressValue;
+    }
+
+    public WriterCallback getCallback() {
+      return callback;
+    }
+
+    public Version getVersion() {
+      return versionValue;
+    }
+
+    public MemoryManager getMemoryManager() {
+      return memoryManagerValue;
+    }
+
+    public int getBufferSize() {
+      return bufferSizeValue;
+    }
+
+    public int getRowIndexStride() {
+      return rowIndexStrideValue;
+    }
+
+    public CompressionStrategy getCompressionStrategy() {
+      return compressionStrategy;
+    }
+
+    public EncodingStrategy getEncodingStrategy() {
+      return encodingStrategy;
+    }
+
+    public double getPaddingTolerance() {
+      return paddingTolerance;
+    }
+
+    public double getBloomFilterFpp() {
+      return bloomFilterFpp;
+    }
+  }
+
+  /**
+   * Create a set of writer options based on a configuration.
+   * @param conf the configuration to use for values
+   * @return A WriterOptions object that can be modified
+   */
+  public static ORCAppender.WriterOptions writerOptions(Configuration conf) {
+    return new ORCAppender.WriterOptions(null, conf);
   }
 
   /**
-   * Create a default set of write options that can be modified.
+   * Create a set of write options based on a set of table properties and
+   * configuration.
+   * @param tableProperties the properties of the table
+   * @param conf the configuration of the query
+   * @return a WriterOptions object that can be modified
    */
-  public static WriterOptions writerOptions(Configuration conf) {
-    return new WriterOptions(conf);
+  public static WriterOptions writerOptions(Properties tableProperties,
+                                            Configuration conf) {
+    return new WriterOptions(tableProperties, conf);
+  }
+
+  private static synchronized MemoryManager getStaticMemoryManager(
+      final Configuration conf) {
+    if (memoryManager == null) {
+      memoryManager = new ThreadLocal<MemoryManager>() {
+        @Override
+        protected MemoryManager initialValue() {
+          return new MemoryManager(conf);
+        }
+      };
+    }
+    return memoryManager.get();
   }
 
   /**
@@ -393,54 +531,13 @@ public final class OrcFile {
    * @throws IOException
    */
   public static Writer createWriter(Path path,
-                                    WriterOptions opts
-                                    ) throws IOException {
-    FileSystem fs = opts.fileSystemValue == null ?
-      path.getFileSystem(opts.configuration) : opts.fileSystemValue;
-
-    return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
-                          opts.stripeSizeValue, opts.compressValue,
-                          opts.bufferSizeValue, opts.rowIndexStrideValue,
-                          opts.memoryManagerValue, opts.blockPaddingValue,
-                          opts.versionValue, opts.callback,
-                          opts.encodingStrategy, opts.compressionStrategy,
-                          opts.paddingTolerance, opts.blockSizeValue,
-                          opts.bloomFilterColumns, opts.bloomFilterFpp,
-                          opts.timezone);
-  }
+                                    WriterOptions opts,
+                                    TimeZone timeZone
+  ) throws IOException {
+    FileSystem fs = opts.getFileSystem() == null ?
+        path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
 
-  /**
-   * Create an ORC file writer. This method is provided for API backward
-   * compatability with Hive 0.11.
-   * @param fs file system
-   * @param path filename to write to
-   * @param inspector the ObjectInspector that inspects the rows
-   * @param stripeSize the number of bytes in a stripe
-   * @param compress how to compress the file
-   * @param bufferSize the number of bytes to compress at once
-   * @param rowIndexStride the number of rows between row index entries or
-   *                       0 to suppress all indexes
-   * @return a new ORC file writer
-   * @throws IOException
-   */
-  public static Writer createWriter(FileSystem fs,
-                                    Path path,
-                                    Configuration conf,
-                                    ObjectInspector inspector,
-                                    long stripeSize,
-                                    CompressionKind compress,
-                                    int bufferSize,
-                                    int rowIndexStride,
-                                    TimeZone timeZone) throws IOException {
-    return createWriter(path,
-                        writerOptions(conf)
-                        .fileSystem(fs)
-                        .inspector(inspector)
-                        .stripeSize(stripeSize)
-                        .compress(compress)
-                        .bufferSize(bufferSize)
-                        .rowIndexStride(rowIndexStride)
-                        .timezone(timeZone));
+    return new WriterImpl(fs, path, opts, timeZone);
   }
 
   private static ThreadLocal<MemoryManager> memoryManager = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
new file mode 100644
index 0000000..7194bf4
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.*;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.*;
+import org.apache.orc.impl.StreamName;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.thirdparty.orc.TreeReaderFactory.DatumTreeReader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+public class OrcRecordReader implements Closeable {
+
+  private final Log LOG = LogFactory.getLog(OrcRecordReader.class);
+
+  private final Path path;
+  private final long firstRow;
+  private final List<StripeInformation> stripes = new ArrayList<>();
+  private OrcProto.StripeFooter stripeFooter;
+  private final long totalRowCount;
+  private final CompressionCodec codec;
+  private final List<OrcProto.Type> types;
+  private final int bufferSize;
+  private final boolean[] included;
+  private final long rowIndexStride;
+  private long rowInStripe = 0;
+  private int currentStripe = -1;
+  private long rowBaseInStripe = 0;
+  private long rowCountInStripe = 0;
+  private final Map<org.apache.orc.impl.StreamName, InStream> streams = new HashMap<>();
+  DiskRangeList bufferChunks = null;
+  private final TreeReaderFactory.DatumTreeReader[] reader;
+  private final OrcProto.RowIndex[] indexes;
+  private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+  private final Configuration conf;
+  private final MetadataReader metadata;
+  private final DataReader dataReader;
+  private final Tuple result;
+
+  public OrcRecordReader(List<StripeInformation> stripes,
+                         FileSystem fileSystem,
+                         Schema schema,
+                         Column[] target,
+                         FileFragment fragment,
+                         List<OrcProto.Type> types,
+                         CompressionCodec codec,
+                         int bufferSize,
+                         long strideRate,
+                         Reader.Options options,
+                         Configuration conf,
+                         TimeZone timeZone) throws IOException {
+
+    result = new VTuple(target.length);
+
+    this.conf = conf;
+    this.path = fragment.getPath();
+    this.codec = codec;
+    this.types = types;
+    this.bufferSize = bufferSize;
+    this.included = new boolean[schema.size() + 1];
+    included[0] = target.length > 0; // always include root column except when target schema size is 0
+    Schema targetSchema = new Schema(target);
+    for (int i = 1; i < included.length; i++) {
+      included[i] = targetSchema.contains(schema.getColumn(i - 1));
+    }
+    this.rowIndexStride = strideRate;
+    this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size());
+
+    long rows = 0;
+    long skippedRows = 0;
+    long offset = fragment.getStartKey();
+    long maxOffset = fragment.getStartKey() + fragment.getLength();
+    for(StripeInformation stripe: stripes) {
+      long stripeStart = stripe.getOffset();
+      if (offset > stripeStart) {
+        skippedRows += stripe.getNumberOfRows();
+      } else if (stripeStart < maxOffset) {
+        this.stripes.add(stripe);
+        rows += stripe.getNumberOfRows();
+      }
+    }
+
+    // TODO: we could change the ctor to pass this externally
+    this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, options.getUseZeroCopy(), codec);
+    this.dataReader.open();
+
+    firstRow = skippedRows;
+    totalRowCount = rows;
+
+    reader = new DatumTreeReader[target.length];
+    for (int i = 0; i < reader.length; i++) {
+      reader[i] = TreeReaderFactory.createTreeReader(timeZone, schema.getColumnId(target[i].getQualifiedName()), target[i],
+          options.getSkipCorruptRecords());
+    }
+
+    indexes = new OrcProto.RowIndex[types.size()];
+    bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+    advanceToNextRow(reader, 0L, true);
+  }
+
+  /**
+   * Plan the ranges of the file that we need to read given the list of
+   * columns and row groups.
+   *
+   * @param streamList        the list of streams available
+   * @param includedColumns   which columns are needed
+   * @param doMergeBuffers
+   * @return the list of disk ranges that will be loaded
+   */
+  static DiskRangeList planReadPartialDataStreams
+  (List<OrcProto.Stream> streamList,
+   boolean[] includedColumns,
+   boolean doMergeBuffers) {
+    long offset = 0;
+    // figure out which columns have a present stream
+    DiskRangeList.CreateHelper list = new DiskRangeList.CreateHelper();
+    for (OrcProto.Stream stream : streamList) {
+      long length = stream.getLength();
+      int column = stream.getColumn();
+      OrcProto.Stream.Kind streamKind = stream.getKind();
+      // since stream kind is optional, first check if it exists
+      if (stream.hasKind() &&
+          (org.apache.orc.impl.StreamName.getArea(streamKind) == org.apache.orc.impl.StreamName.Area.DATA) &&
+          includedColumns[column]) {
+        RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
+      }
+      offset += length;
+    }
+    return list.extract();
+  }
+
+  void createStreams(List<OrcProto.Stream> streamDescriptions,
+                     DiskRangeList ranges,
+                     boolean[] includeColumn,
+                     CompressionCodec codec,
+                     int bufferSize,
+                     Map<org.apache.orc.impl.StreamName, InStream> streams) throws IOException {
+    long streamOffset = 0;
+    for (OrcProto.Stream streamDesc : streamDescriptions) {
+      int column = streamDesc.getColumn();
+      if ((includeColumn != null && !includeColumn[column]) ||
+          streamDesc.hasKind() &&
+              (org.apache.orc.impl.StreamName.getArea(streamDesc.getKind()) != org.apache.orc.impl.StreamName.Area.DATA)) {
+        streamOffset += streamDesc.getLength();
+        continue;
+      }
+      List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+          ranges, streamOffset, streamDesc.getLength());
+      org.apache.orc.impl.StreamName name = new StreamName(column, streamDesc.getKind());
+      streams.put(name, InStream.create(name.toString(), buffers,
+          streamDesc.getLength(), codec, bufferSize));
+      streamOffset += streamDesc.getLength();
+    }
+  }
+
+  private void readPartialDataStreams(StripeInformation stripe) throws IOException {
+    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+    DiskRangeList toRead = planReadPartialDataStreams(streamList, included, true);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
+    }
+    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
+    }
+
+    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+  }
+
+  /**
+   * Skip over rows that we aren't selecting, so that the next row is
+   * one that we will read.
+   *
+   * @param nextRow the row we want to go to
+   * @throws IOException
+   */
+  private boolean advanceToNextRow(
+      TreeReaderFactory.TreeReader[] reader, long nextRow, boolean canAdvanceStripe)
+      throws IOException {
+    long nextRowInStripe = nextRow - rowBaseInStripe;
+
+    if (nextRowInStripe >= rowCountInStripe) {
+      if (canAdvanceStripe) {
+        advanceStripe();
+      }
+      return canAdvanceStripe;
+    }
+    if (nextRowInStripe != rowInStripe) {
+      if (rowIndexStride != 0) {
+        int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+        seekToRowEntry(reader, rowGroup);
+        for (TreeReaderFactory.TreeReader eachReader : reader) {
+          eachReader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+        }
+      } else {
+        for (TreeReaderFactory.TreeReader eachReader : reader) {
+          eachReader.skipRows(nextRowInStripe - rowInStripe);
+        }
+      }
+      rowInStripe = nextRowInStripe;
+    }
+    return true;
+  }
+
+  public boolean hasNext() throws IOException {
+    return rowInStripe < rowCountInStripe;
+  }
+
+  public Tuple next() throws IOException {
+    if (hasNext()) {
+      try {
+        for (int i = 0; i < reader.length; i++) {
+          result.put(i, reader[i].next());
+        }
+        // find the next row
+        rowInStripe += 1;
+        advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+        return result;
+      } catch (IOException e) {
+        // Rethrow exception with file name in log message
+        throw new IOException("Error reading file: " + path, e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Read the next stripe until we find a row that we don't skip.
+   *
+   * @throws IOException
+   */
+  private void advanceStripe() throws IOException {
+    rowInStripe = rowCountInStripe;
+    while (rowInStripe >= rowCountInStripe &&
+        currentStripe < stripes.size() - 1) {
+      currentStripe += 1;
+      readStripe();
+    }
+  }
+
+  /**
+   * Read the current stripe into memory.
+   *
+   * @throws IOException
+   */
+  private void readStripe() throws IOException {
+    StripeInformation stripe = beginReadStripe();
+
+    // if we haven't skipped the whole stripe, read the data
+    if (rowInStripe < rowCountInStripe) {
+      // if we aren't projecting columns or filtering rows, just read it all
+      if (included == null) {
+        readAllDataStreams(stripe);
+      } else {
+        readPartialDataStreams(stripe);
+      }
+
+      for (TreeReaderFactory.TreeReader eachReader : reader) {
+        eachReader.startStripe(streams, stripeFooter);
+      }
+      // if we skipped the first row group, move the pointers forward
+      if (rowInStripe != 0) {
+        seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
+      }
+    }
+  }
+
+  private void clearStreams() throws IOException {
+    // explicit close of all streams to de-ref ByteBuffers
+    for (InStream is : streams.values()) {
+      is.close();
+    }
+    if (bufferChunks != null) {
+      if (dataReader.isTrackingDiskRanges()) {
+        for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
+          if (!(range instanceof BufferChunk)) {
+            continue;
+          }
+          dataReader.releaseBuffer(((BufferChunk) range).getChunk());
+        }
+      }
+    }
+    bufferChunks = null;
+    streams.clear();
+  }
+
+  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+    return metadata.readStripeFooter(stripe);
+  }
+
+  private StripeInformation beginReadStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    clearStreams();
+    // setup the position in the stripe
+    rowCountInStripe = stripe.getNumberOfRows();
+    rowInStripe = 0;
+    rowBaseInStripe = 0;
+    for (int i = 0; i < currentStripe; ++i) {
+      rowBaseInStripe += stripes.get(i).getNumberOfRows();
+    }
+    // reset all of the indexes
+    for (int i = 0; i < indexes.length; ++i) {
+      indexes[i] = null;
+    }
+    return stripe;
+  }
+
+  private void readAllDataStreams(StripeInformation stripe) throws IOException {
+    long start = stripe.getIndexLength();
+    long end = start + stripe.getDataLength();
+    // explicitly trigger 1 big read
+    DiskRangeList toRead = new DiskRangeList(start, end);
+    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+    List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
+    createStreams(streamDescriptions, bufferChunks, included, codec, bufferSize, streams);
+  }
+
+  public long getRowNumber() {
+    return rowInStripe + rowBaseInStripe + firstRow;
+  }
+
+  public float getProgress() {
+    return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+  }
+
+  private int findStripe(long rowNumber) {
+    for (int i = 0; i < stripes.size(); i++) {
+      StripeInformation stripe = stripes.get(i);
+      if (stripe.getNumberOfRows() > rowNumber) {
+        return i;
+      }
+      rowNumber -= stripe.getNumberOfRows();
+    }
+    throw new IllegalArgumentException("Seek after the end of reader range");
+  }
+
+  OrcIndex readRowIndex(
+      int stripeIndex, boolean[] included) throws IOException {
+    return readRowIndex(stripeIndex, included, null, null);
+  }
+
+  OrcIndex readRowIndex(int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes,
+                        OrcProto.BloomFilterIndex[] bloomFilterIndex) throws IOException {
+    StripeInformation stripe = stripes.get(stripeIndex);
+    OrcProto.StripeFooter stripeFooter = null;
+    // if this is the current stripe, use the cached objects.
+    if (stripeIndex == currentStripe) {
+      stripeFooter = this.stripeFooter;
+      indexes = indexes == null ? this.indexes : indexes;
+      bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
+    }
+    return metadata.readRowIndex(stripe, stripeFooter, included, indexes, null,
+        bloomFilterIndex);
+  }
+
+  private void seekToRowEntry(TreeReaderFactory.TreeReader []reader, int rowEntry)
+      throws IOException {
+    PositionProvider[] index = new PositionProvider[indexes.length];
+    for (int i = 0; i < indexes.length; ++i) {
+      if (indexes[i] != null) {
+        index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+      }
+    }
+    for (TreeReaderFactory.TreeReader eachReader : reader) {
+      eachReader.seek(index);
+    }
+  }
+
+  public void seekToRow(long rowNumber) throws IOException {
+    if (rowNumber < 0) {
+      throw new IllegalArgumentException("Seek to a negative row number " +
+          rowNumber);
+    } else if (rowNumber < firstRow) {
+      throw new IllegalArgumentException("Seek before reader range " +
+          rowNumber);
+    }
+    // convert to our internal form (rows from the beginning of slice)
+    rowNumber -= firstRow;
+
+    // move to the right stripe
+    int rightStripe = findStripe(rowNumber);
+    if (rightStripe != currentStripe) {
+      currentStripe = rightStripe;
+      readStripe();
+    }
+    readRowIndex(currentStripe, included);
+
+    // if we aren't to the right row yet, advance in the stripe.
+    advanceToNextRow(reader, rowNumber, true);
+  }
+
+  public long getNumBytes() {
+    return ((RecordReaderUtils.DefaultDataReader)dataReader).getReadBytes();
+  }
+
+  @Override
+  public void close() throws IOException {
+    clearStreams();
+    dataReader.close();
+  }
+
+  public static final class PositionProviderImpl implements PositionProvider {
+    private final OrcProto.RowIndexEntry entry;
+    private int index;
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+      this(entry, 0);
+    }
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
+      this.entry = entry;
+      this.index = startPos;
+    }
+
+    @Override
+    public long getNext() {
+      return entry.getPositions(index++);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
index 3a474dd..b8d3f52 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
@@ -17,185 +17,101 @@
  */
 package org.apache.tajo.storage.thirdparty.orc;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SnappyCodec;
+import org.apache.orc.impl.ZlibCodec;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TypeDesc;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedDataTypeException;
 
 public class OrcUtils {
   private static final Log LOG = LogFactory.getLog(OrcUtils.class);
 
-  /**
-   * Returns selected columns as a boolean array with true value set for specified column names.
-   * The result will contain number of elements equal to flattened number of columns.
-   * For example:
-   * selectedColumns - a,b,c
-   * allColumns - a,b,c,d
-   * If column c is a complex type, say list<string> and other types are primitives then result will
-   * be [false, true, true, true, true, true, false]
-   * Index 0 is the root element of the struct which is set to false by default, index 1,2
-   * corresponds to columns a and b. Index 3,4 correspond to column c which is list<string> and
-   * index 5 correspond to column d. After flattening list<string> gets 2 columns.
-   *
-   * @param selectedColumns - comma separated list of selected column names
-   * @param allColumns      - comma separated list of all column names
-   * @param inspector       - object inspector
-   * @return - boolean array with true value set for the specified column names
-   */
-  public static boolean[] includeColumns(String selectedColumns, String allColumns,
-      ObjectInspector inspector) {
-    int numFlattenedCols = getFlattenedColumnsCount(inspector);
-    boolean[] results = new boolean[numFlattenedCols];
-    if ("*".equals(selectedColumns)) {
-      Arrays.fill(results, true);
-      return results;
-    }
-    if (selectedColumns != null && !selectedColumns.isEmpty()) {
-      includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, inspector);
-    }
-    return results;
-  }
-
-  private static void includeColumnsImpl(boolean[] includeColumns, String selectedColumns,
-      String allColumns,
-      ObjectInspector inspector) {
-      Map<String, List<Integer>> columnSpanMap = getColumnSpan(allColumns, inspector);
-      LOG.info("columnSpanMap: " + columnSpanMap);
-
-      String[] selCols = selectedColumns.split(",");
-      for (String sc : selCols) {
-        if (columnSpanMap.containsKey(sc)) {
-          List<Integer> colSpan = columnSpanMap.get(sc);
-          int start = colSpan.get(0);
-          int end = colSpan.get(1);
-          for (int i = start; i <= end; i++) {
-            includeColumns[i] = true;
+  public static org.apache.orc.CompressionCodec createCodec(org.apache.orc.CompressionKind kind) {
+    switch (kind) {
+      case NONE:
+        return null;
+      case ZLIB:
+        return new ZlibCodec();
+      case SNAPPY:
+        return new SnappyCodec();
+      case LZO:
+        try {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          if (loader == null) {
+            throw new RuntimeException("error while getting a class loader");
           }
+          @SuppressWarnings("unchecked")
+          Class<? extends org.apache.orc.CompressionCodec> lzo =
+              (Class<? extends CompressionCodec>)
+                  loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+          return lzo.newInstance();
+        } catch (ClassNotFoundException e) {
+          throw new IllegalArgumentException("LZO is not available.", e);
+        } catch (InstantiationException e) {
+          throw new IllegalArgumentException("Problem initializing LZO", e);
+        } catch (IllegalAccessException e) {
+          throw new IllegalArgumentException("Insufficient access to LZO", e);
         }
-      }
-
-      LOG.info("includeColumns: " + Arrays.toString(includeColumns));
+      default:
+        throw new IllegalArgumentException("Unknown compression codec: " +
+            kind);
     }
+  }
 
-  private static Map<String, List<Integer>> getColumnSpan(String allColumns,
-      ObjectInspector inspector) {
-    // map that contains the column span for each column. Column span is the number of columns
-    // required after flattening. For a given object inspector this map contains the start column
-    // id and end column id (both inclusive) after flattening.
-    // EXAMPLE:
-    // schema: struct<a:int, b:float, c:map<string,int>>
-    // column span map for the above struct will be
-    // a => [1,1], b => [2,2], c => [3,5]
-    Map<String, List<Integer>> columnSpanMap = new HashMap<>();
-    if (allColumns != null) {
-      String[] columns = allColumns.split(",");
-      int startIdx = 0;
-      int endIdx = 0;
-      if (inspector instanceof StructObjectInspector) {
-        StructObjectInspector soi = (StructObjectInspector) inspector;
-        List<? extends StructField> fields = soi.getAllStructFieldRefs();
-        for (int i = 0; i < fields.size(); i++) {
-          StructField sf = fields.get(i);
-
-          // we get the type (category) from object inspector but column name from the argument.
-          // The reason for this is hive (FileSinkOperator) does not pass the actual column names,
-          // instead it passes the internal column names (_col1,_col2).
-          ObjectInspector sfOI = sf.getFieldObjectInspector();
-          String colName = columns[i];
+  public static TypeDescription convertSchema(Schema schema) {
+    TypeDescription description = TypeDescription.createStruct();
 
-          startIdx = endIdx + 1;
-          switch (sfOI.getCategory()) {
-            case PRIMITIVE:
-              endIdx += 1;
-              break;
-            case STRUCT:
-              endIdx += 1;
-              StructObjectInspector structInsp = (StructObjectInspector) sfOI;
-              List<? extends StructField> structFields = structInsp.getAllStructFieldRefs();
-              for (StructField structField : structFields) {
-                endIdx += getFlattenedColumnsCount(structField.getFieldObjectInspector());
-              }
-              break;
-            case MAP:
-              endIdx += 1;
-              MapObjectInspector mapInsp = (MapObjectInspector) sfOI;
-              endIdx += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
-              endIdx += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
-              break;
-            case LIST:
-              endIdx += 1;
-              ListObjectInspector listInsp = (ListObjectInspector) sfOI;
-              endIdx += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
-              break;
-            case UNION:
-              endIdx += 1;
-              UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI;
-              List<ObjectInspector> choices = unionInsp.getObjectInspectors();
-              for (ObjectInspector choice : choices) {
-                endIdx += getFlattenedColumnsCount(choice);
-              }
-              break;
-            default:
-              throw new IllegalArgumentException("Bad category: " +
-                  inspector.getCategory());
-          }
-
-          columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx));
-        }
-      }
+    for (Column eachColumn : schema.getRootColumns()) {
+      description.addField(eachColumn.getQualifiedName(),
+          convertTypeInfo(eachColumn.getTypeDesc()));
     }
-    return columnSpanMap;
+    return description;
   }
 
-  /**
-   * Returns the number of columns after flatting complex types.
-   *
-   * @param inspector - object inspector
-   * @return
-   */
-  public static int getFlattenedColumnsCount(ObjectInspector inspector) {
-    int numWriters = 0;
-    switch (inspector.getCategory()) {
-      case PRIMITIVE:
-        numWriters += 1;
-        break;
-      case STRUCT:
-        numWriters += 1;
-        StructObjectInspector structInsp = (StructObjectInspector) inspector;
-        List<? extends StructField> fields = structInsp.getAllStructFieldRefs();
-        for (StructField field : fields) {
-          numWriters += getFlattenedColumnsCount(field.getFieldObjectInspector());
-        }
-        break;
-      case MAP:
-        numWriters += 1;
-        MapObjectInspector mapInsp = (MapObjectInspector) inspector;
-        numWriters += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
-        numWriters += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
-        break;
-      case LIST:
-        numWriters += 1;
-        ListObjectInspector listInsp = (ListObjectInspector) inspector;
-        numWriters += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
-        break;
-      case UNION:
-        numWriters += 1;
-        UnionObjectInspector unionInsp = (UnionObjectInspector) inspector;
-        List<ObjectInspector> choices = unionInsp.getObjectInspectors();
-        for (ObjectInspector choice : choices) {
-          numWriters += getFlattenedColumnsCount(choice);
+  public static TypeDescription convertTypeInfo(TypeDesc desc) {
+    switch (desc.getDataType().getType()) {
+      case BOOLEAN:
+        return TypeDescription.createBoolean();
+      case BIT:
+        return TypeDescription.createByte();
+      case INT2:
+        return TypeDescription.createShort();
+      case INT4:
+      case INET4:
+        return TypeDescription.createInt();
+      case INT8:
+        return TypeDescription.createLong();
+      case FLOAT4:
+        return TypeDescription.createFloat();
+      case FLOAT8:
+        return TypeDescription.createDouble();
+      case TEXT:
+        return TypeDescription.createString();
+      case DATE:
+        return TypeDescription.createDate();
+      case TIMESTAMP:
+        return TypeDescription.createTimestamp();
+      case BLOB:
+        return TypeDescription.createBinary();
+      case CHAR:
+        return TypeDescription.createChar()
+            .withMaxLength(desc.getDataType().getLength());
+      case RECORD: {
+        TypeDescription result = TypeDescription.createStruct();
+        for (Column eachColumn : desc.getNestedSchema().getRootColumns()) {
+          result.addField(eachColumn.getQualifiedName(),
+              convertTypeInfo(eachColumn.getTypeDesc()));
         }
-        break;
+        return result;
+      }
       default:
-        throw new IllegalArgumentException("Bad category: " +
-            inspector.getCategory());
+        throw new TajoRuntimeException(new UnsupportedDataTypeException(desc.getDataType().getType().name()));
     }
-    return numWriters;
   }
-
 }