You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/11/21 07:53:49 UTC

[2/3] TAJO-200: RCFile compatible to apache hive. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/effa7df6/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 245aba5..74e2192 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.storage.rcfile;
 
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -26,9 +28,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 
 import java.io.*;
 import java.rmi.server.UID;
@@ -39,21 +48,21 @@ import java.util.Arrays;
  * <code>RCFile</code>s, short of Record Columnar File, are flat files
  * consisting of binary key/value pairs, which shares much similarity with
  * <code>SequenceFile</code>.
- *
+ * <p/>
  * RCFile stores columns of a table in a record columnar way. It first
  * partitions rows horizontally into row splits. and then it vertically
  * partitions each row split in a columnar way. RCFile first stores the meta
  * data of a row split, as the key part of a record, and all the data of a row
- * split as the value part. When writing, RCFile.Appender first holds records'
+ * split as the value part. When writing, RCFile.Writer first holds records'
  * value bytes in memory, and determines a row split if the raw bytes size of
- * buffered records overflow a given parameter<tt>Appender.columnsBufferSize</tt>,
+ * buffered records overflow a given parameter<tt>Writer.columnsBufferSize</tt>,
  * which can be set like: <code>conf.setInt(COLUMNS_BUFFER_SIZE_CONF_STR,
- 4 * 1024 * 1024)</code> .
+ * 4 * 1024 * 1024)</code> .
  * <p>
  * <code>RCFile</code> provides {@link Writer}, {@link Reader} and classes for
  * writing, reading respectively.
  * </p>
- *
+ * <p/>
  * <p>
  * RCFile stores columns of a table in a record columnar way. It first
  * partitions rows horizontally into row splits. and then it vertically
@@ -61,21 +70,21 @@ import java.util.Arrays;
  * data of a row split, as the key part of a record, and all the data of a row
  * split as the value part.
  * </p>
- *
+ * <p/>
  * <p>
  * RCFile compresses values in a more fine-grained manner then record level
  * compression. However, It currently does not support compress the key part
  * yet. The actual compression algorithm used to compress key and/or values can
  * be specified by using the appropriate {@link CompressionCodec}.
  * </p>
- *
+ * <p/>
  * <p>
  * The {@link Reader} is used to read and explain the bytes of RCFile.
  * </p>
- *
+ * <p/>
  * <h4 id="Formats">RCFile Formats</h4>
- *
- *
+ * <p/>
+ * <p/>
  * <h5 id="Header">RC Header</h5>
  * <ul>
  * <li>version - 3 bytes of magic header <b>RCF</b>, followed by 1 byte of
@@ -87,7 +96,7 @@ import java.util.Arrays;
  * <li>metadata - {@link Metadata} for this file.</li>
  * <li>sync - A sync marker to denote end of the header.</li>
  * </ul>
- *
+ * <p/>
  * <h5>RCFile Format</h5>
  * <ul>
  * <li><a href="#Header">Header</a></li>
@@ -117,26 +126,208 @@ import java.util.Arrays;
  * </ul>
  * </li>
  * </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
+ *
+ * rcfile ::=
+ *   <file-header>
+ *   <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ *   <file-version-header>
+ *   <file-key-class-name>              (only exists if version is seq6)
+ *   <file-value-class-name>            (only exists if version is seq6)
+ *   <file-is-compressed>
+ *   <file-is-block-compressed>         (only exists if version is seq6)
+ *   [<file-compression-codec-class>]
+ *   <file-header-metadata>
+ *   <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header.  A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ *                     |   Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ *   Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ *   Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ *   <rowgroup-header>
+ *   <rowgroup-key-data>
+ *   <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ *   [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ *   <rowgroup-record-length>
+ *   <rowgroup-key-length>
+ *   <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ *   <rowgroup-num-rows>
+ *   <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ *   <column-buffer-length>
+ *   <column-buffer-uncompressed-length>
+ *   <column-key-buffer-length>
+ *   <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
  *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
  */
 public class RCFile {
 
   private static final Log LOG = LogFactory.getLog(RCFile.class);
 
   public static final String RECORD_INTERVAL_CONF_STR = "hive.io.rcfile.record.interval";
-
   public static final String COLUMN_NUMBER_METADATA_STR = "hive.io.rcfile.column.number";
 
-  public static final String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf";
-
-  public static final String TOLERATE_CORRUPTIONS_CONF_STR =
-      "hive.io.rcfile.tolerate.corruptions";
-
-  // HACK: We actually need BlockMissingException, but that is not available
-  // in all hadoop versions.
-  public static final String BLOCK_MISSING_MESSAGE =
-      "Could not obtain block";
-
   // All of the versions should be place in this list.
   private static final int ORIGINAL_VERSION = 0;  // version with SEQ
   private static final int NEW_MAGIC_VERSION = 1; // version with RCF
@@ -144,31 +335,35 @@ public class RCFile {
   private static final int CURRENT_VERSION = NEW_MAGIC_VERSION;
 
   // The first version of RCFile used the sequence file header.
-  private static final byte[] ORIGINAL_MAGIC = new byte[] {
+  private static final byte[] ORIGINAL_MAGIC = new byte[]{
       (byte) 'S', (byte) 'E', (byte) 'Q'};
   // the version that was included with the original magic, which is mapped
   // into ORIGINAL_VERSION
   private static final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
 
-  private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[] {
+  private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[]{
       (byte) 'S', (byte) 'E', (byte) 'Q', ORIGINAL_MAGIC_VERSION_WITH_METADATA
   };
 
   // The 'magic' bytes at the beginning of the RCFile
-  private static final byte[] MAGIC = new byte[] {
+  private static final byte[] MAGIC = new byte[]{
       (byte) 'R', (byte) 'C', (byte) 'F'};
 
   private static final int SYNC_ESCAPE = -1; // "length" of sync entries
   private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
   private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
 
-  /** The number of bytes between sync points. */
+  /**
+   * The number of bytes between sync points.
+   */
   public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+  public static final String NULL = "rcfile.null";
+  public static final String SERDE = "rcfile.serde";
 
   /**
    * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
    * below:
-   *
+   * <p/>
    * <ul>
    * <li>record length in bytes,it is the sum of bytes used to store the key
    * part and the value part.</li>
@@ -185,62 +380,28 @@ public class RCFile {
    * <li>{the end of the key part}</li>
    * </ul>
    */
-  public static class KeyBuffer implements WritableComparable {
+  public static class KeyBuffer {
     // each column's length in the value
     private int[] eachColumnValueLen = null;
     private int[] eachColumnUncompressedValueLen = null;
     // stores each cell's length of a column in one DataOutputBuffer element
-    private NonSyncDataOutputBuffer[] allCellValLenBuffer = null;
+    private NonSyncByteArrayOutputStream[] allCellValLenBuffer = null;
     // how many rows in this split
     private int numberRows = 0;
     // how many columns
     private int columnNumber = 0;
 
-    // return the number of columns recorded in this file's header
-    public int getColumnNumber() {
-      return columnNumber;
-    }
-
-    @SuppressWarnings("unused")
-    @Deprecated
-    public KeyBuffer(){
-    }
-
     KeyBuffer(int columnNum) {
       columnNumber = columnNum;
       eachColumnValueLen = new int[columnNumber];
       eachColumnUncompressedValueLen = new int[columnNumber];
-      allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
-    }
-
-    @SuppressWarnings("unused")
-    @Deprecated
-    KeyBuffer(int numberRows, int columnNum) {
-      this(columnNum);
-      this.numberRows = numberRows;
+      allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
     }
 
-    /**
-     * add in a new column's meta data.
-     *
-     * @param columnValueLen
-     *          this total bytes number of this column's values in this split
-     * @param colValLenBuffer
-     *          each cell's length of this column's in this split
-     */
-    void setColumnLenInfo(int columnValueLen,
-                          NonSyncDataOutputBuffer colValLenBuffer,
-                          int columnUncompressedValueLen, int columnIndex) {
-      eachColumnValueLen[columnIndex] = columnValueLen;
-      eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
-      allCellValLenBuffer[columnIndex] = colValLenBuffer;
-    }
-
-    @Override
     public void readFields(DataInput in) throws IOException {
       eachColumnValueLen = new int[columnNumber];
       eachColumnUncompressedValueLen = new int[columnNumber];
-      allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+      allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
 
       numberRows = WritableUtils.readVInt(in);
       for (int i = 0; i < columnNumber; i++) {
@@ -248,7 +409,7 @@ public class RCFile {
         eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
         int bufLen = WritableUtils.readVInt(in);
         if (allCellValLenBuffer[i] == null) {
-          allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
+          allCellValLenBuffer[i] = new NonSyncByteArrayOutputStream();
         } else {
           allCellValLenBuffer[i].reset();
         }
@@ -256,43 +417,11 @@ public class RCFile {
       }
     }
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-      // out.writeInt(numberRows);
-      WritableUtils.writeVLong(out, numberRows);
-      for (int i = 0; i < eachColumnValueLen.length; i++) {
-        WritableUtils.writeVLong(out, eachColumnValueLen[i]);
-        WritableUtils.writeVLong(out, eachColumnUncompressedValueLen[i]);
-        NonSyncDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
-        int bufLen = colRowsLenBuf.getLength();
-        WritableUtils.writeVLong(out, bufLen);
-        out.write(colRowsLenBuf.getData(), 0, bufLen);
-      }
-    }
-
     /**
-     * get number of bytes to store the keyBuffer.
-     *
-     * @return number of bytes used to store this KeyBuffer on disk
-     * @throws IOException
+     * @return the numberRows
      */
-    public int getSize() throws IOException {
-      int ret = 0;
-      ret += WritableUtils.getVIntSize(numberRows);
-      for (int i = 0; i < eachColumnValueLen.length; i++) {
-        ret += WritableUtils.getVIntSize(eachColumnValueLen[i]);
-        ret += WritableUtils.getVIntSize(eachColumnUncompressedValueLen[i]);
-        ret += WritableUtils.getVIntSize(allCellValLenBuffer[i].getLength());
-        ret += allCellValLenBuffer[i].getLength();
-      }
-
-      return ret;
-    }
-
-    @Override
-    public int compareTo(Object arg0) {
-      throw new RuntimeException("compareTo not supported in class "
-          + this.getClass().getName());
+    public int getNumberRows() {
+      return numberRows;
     }
   }
 
@@ -306,53 +435,10 @@ public class RCFile {
    * column_2_row_2_value,....]</li>
    * </ul>
    */
-  public static class ValueBuffer implements WritableComparable {
-
-    class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
-
-      int index = -1;
-      int colIndex = -1;
-
-      public LazyDecompressionCallbackImpl(int index, int colIndex) {
-        super();
-        this.index = index;
-        this.colIndex = colIndex;
-      }
-
-      @Override
-      public byte[] decompress() throws IOException {
-
-        if (decompressedFlag[index] || codec == null) {
-          return loadedColumnsValueBuffer[index].getData();
-        }
-
-        NonSyncDataOutputBuffer compressedData = compressedColumnsValueBuffer[index];
-        decompressBuffer.reset();
-        DataInputStream valueIn = new DataInputStream(deflatFilter);
-        deflatFilter.resetState();
-        if (deflatFilter instanceof SchemaAwareCompressionInputStream) {
-          ((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex);
-        }
-        decompressBuffer.reset(compressedData.getData(),
-            keyBuffer.eachColumnValueLen[colIndex]);
-
-        NonSyncDataOutputBuffer decompressedColBuf = loadedColumnsValueBuffer[index];
-        decompressedColBuf.reset();
-        decompressedColBuf.write(valueIn,
-            keyBuffer.eachColumnUncompressedValueLen[colIndex]);
-        decompressedFlag[index] = true;
-        numCompressed--;
-        return decompressedColBuf.getData();
-      }
-    }
+  public static class ValueBuffer {
 
     // used to load columns' value into memory
-    private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
-    private NonSyncDataOutputBuffer[] compressedColumnsValueBuffer = null;
-    private boolean[] decompressedFlag = null;
-    private int numCompressed;
-    private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
-    private boolean lazyDecompress = true;
+    private NonSyncByteArrayOutputStream[] loadedColumnsValueBuffer = null;
 
     boolean inited = false;
 
@@ -364,104 +450,33 @@ public class RCFile {
     boolean[] skippedColIDs = null;
 
     CompressionCodec codec;
-
-    Decompressor valDecompressor = null;
+    Decompressor decompressor = null;
     NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
-    CompressionInputStream deflatFilter = null;
-
-    @SuppressWarnings("unused")
-    @Deprecated
-    public ValueBuffer() throws IOException {
-    }
-
-    @SuppressWarnings("unused")
-    @Deprecated
-    public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
-      this(keyBuffer, keyBuffer.columnNumber, null, null, true);
-    }
 
-    @SuppressWarnings("unused")
-    @Deprecated
-    public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs)
-        throws IOException {
-      this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null, true);
-    }
 
-    @SuppressWarnings("unused")
-    @Deprecated
     public ValueBuffer(KeyBuffer currentKey, int columnNumber,
-                       boolean[] skippedCols, CompressionCodec codec) throws IOException {
-      this(currentKey, columnNumber, skippedCols, codec, true);
-    }
-
-    public ValueBuffer(KeyBuffer currentKey, int columnNumber,
-                       boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress)
+                       int[] targets, CompressionCodec codec, boolean[] skippedIDs)
         throws IOException {
-      this.lazyDecompress = lazyDecompress;
       keyBuffer = currentKey;
       this.columnNumber = columnNumber;
-
-      if (skippedCols != null && skippedCols.length > 0) {
-        skippedColIDs = skippedCols;
-      } else {
-        skippedColIDs = new boolean[columnNumber];
-        for (int i = 0; i < skippedColIDs.length; i++) {
-          skippedColIDs[i] = false;
-        }
-      }
-
-      int skipped = 0;
-      for (boolean currentSkip : skippedColIDs) {
-        if (currentSkip) {
-          skipped++;
-        }
-      }
-      loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
-          - skipped];
-      decompressedFlag = new boolean[columnNumber - skipped];
-      lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber
-          - skipped];
-      compressedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
-          - skipped];
+      this.skippedColIDs = skippedIDs;
       this.codec = codec;
+      loadedColumnsValueBuffer = new NonSyncByteArrayOutputStream[targets.length];
+
       if (codec != null) {
-        valDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
-        deflatFilter = codec.createInputStream(decompressBuffer,
-            valDecompressor);
-      }
-      if (codec != null) {
-        numCompressed = decompressedFlag.length;
-      } else {
-        numCompressed = 0;
-      }
-      for (int k = 0, readIndex = 0; k < columnNumber; k++) {
-        if (skippedColIDs[k]) {
-          continue;
-        }
-        loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
-        if (codec != null) {
-          decompressedFlag[readIndex] = false;
-          lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(
-              readIndex, k);
-          compressedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
-        } else {
-          decompressedFlag[readIndex] = true;
-        }
-        readIndex++;
+        decompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
       }
-    }
 
-    @SuppressWarnings("unused")
-    @Deprecated
-    public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
-                                     int addIndex) {
-      loadedColumnsValueBuffer[addIndex] = valBuffer;
+      for (int i = 0; i < targets.length; i++) {
+        loadedColumnsValueBuffer[i] = new NonSyncByteArrayOutputStream();
+      }
     }
 
-    @Override
     public void readFields(DataInput in) throws IOException {
       int addIndex = 0;
       int skipTotal = 0;
+
+
       for (int i = 0; i < columnNumber; i++) {
         int vaRowsLen = keyBuffer.eachColumnValueLen[i];
         // skip this column
@@ -475,65 +490,61 @@ public class RCFile {
           skipTotal = 0;
         }
 
-        NonSyncDataOutputBuffer valBuf;
-        if (codec != null){
+        NonSyncByteArrayOutputStream valBuf;
+        if (codec != null) {
           // load into compressed buf first
-          valBuf = compressedColumnsValueBuffer[addIndex];
+
+          byte[] compressedBytes = new byte[vaRowsLen];
+          in.readFully(compressedBytes, 0, vaRowsLen);
+
+          decompressBuffer.reset(compressedBytes, vaRowsLen);
+          if(decompressor != null) decompressor.reset();
+
+          DataInputStream is;
+          if (codec instanceof SplittableCompressionCodec) {
+            SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
+                decompressBuffer, decompressor, 0, vaRowsLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+            is = new DataInputStream(deflatFilter);
+          } else {
+            CompressionInputStream deflatFilter = codec.createInputStream(decompressBuffer, decompressor);
+            is = new DataInputStream(deflatFilter);
+          }
+
+          valBuf = loadedColumnsValueBuffer[addIndex];
+          valBuf.reset();
+          valBuf.write(is, keyBuffer.eachColumnUncompressedValueLen[i]);
+          is.close();
+          decompressBuffer.close();
         } else {
           valBuf = loadedColumnsValueBuffer[addIndex];
-        }
-        valBuf.reset();
-        valBuf.write(in, vaRowsLen);
-        if (codec != null) {
-          decompressedFlag[addIndex] = false;
-          if (!lazyDecompress) {
-            lazyDecompressCallbackObjs[addIndex].decompress();
-            decompressedFlag[addIndex] = true;
-          }
+          valBuf.reset();
+          valBuf.write(in, vaRowsLen);
         }
         addIndex++;
       }
-      if (codec != null) {
-        numCompressed = decompressedFlag.length;
-      }
 
       if (skipTotal != 0) {
         in.skipBytes(skipTotal);
       }
     }
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-      if (codec != null) {
-        for (NonSyncDataOutputBuffer currentBuf : compressedColumnsValueBuffer) {
-          out.write(currentBuf.getData(), 0, currentBuf.getLength());
-        }
-      } else {
-        for (NonSyncDataOutputBuffer currentBuf : loadedColumnsValueBuffer) {
-          out.write(currentBuf.getData(), 0, currentBuf.getLength());
-        }
-      }
-    }
-
     public void clearColumnBuffer() throws IOException {
       decompressBuffer.reset();
     }
 
     public void close() {
-      for (NonSyncDataOutputBuffer element : loadedColumnsValueBuffer) {
+      for (NonSyncByteArrayOutputStream element : loadedColumnsValueBuffer) {
         IOUtils.closeStream(element);
       }
       if (codec != null) {
         IOUtils.closeStream(decompressBuffer);
-        org.apache.tajo.storage.compress.CodecPool.returnDecompressor(valDecompressor);
+        if (decompressor != null) {
+          // Make sure we only return decompressor once.
+          org.apache.tajo.storage.compress.CodecPool.returnDecompressor(decompressor);
+          decompressor = null;
+        }
       }
     }
-
-    @Override
-    public int compareTo(Object arg0) {
-      throw new RuntimeException("compareTo not supported in class "
-          + this.getClass().getName());
-    }
   }
 
   /**
@@ -543,12 +554,12 @@ public class RCFile {
   public static Metadata createMetadata(Text... values) {
     if (values.length % 2 != 0) {
       throw new IllegalArgumentException("Must have a matched set of " +
-          "key-value pairs. " + values.length+
+          "key-value pairs. " + values.length +
           " strings supplied.");
     }
     Metadata result = new Metadata();
-    for(int i=0; i < values.length; i += 2) {
-      result.set(values[i], values[i+1]);
+    for (int i = 0; i < values.length; i += 2) {
+      result.set(values[i], values[i + 1]);
     }
     return result;
   }
@@ -556,21 +567,37 @@ public class RCFile {
   /**
    * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
    * compatible with SequenceFile's.
-   *
    */
-  public static class Writer {
-
+  public static class RCFileAppender extends FileAppender {
     Configuration conf;
     FSDataOutputStream out;
 
     CompressionCodec codec = null;
     Metadata metadata = null;
+    FileSystem fs = null;
+    TableStatistics stats = null;
+    int columnNumber = 0;
+
+    // how many records the writer buffers before it writes to disk
+    private int RECORD_INTERVAL = Integer.MAX_VALUE;
+    // the max size of memory for buffering records before writes them out
+    private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 4M
+    // the conf string for COLUMNS_BUFFER_SIZE
+    public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+
+    // how many records already buffered
+    private int bufferedRecords = 0;
+    private ColumnBuffer[] columnBuffers = null;
+    boolean useNewMagic = true;
+    private byte[] nullChars;
+    SerializeDeserialize serde;
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
     long lastSyncPos; // position of last sync
     byte[] sync; // 16 random bytes
+
     {
       try {
         MessageDigest digester = MessageDigest.getInstance("MD5");
@@ -582,36 +609,14 @@ public class RCFile {
       }
     }
 
-    // how many records the writer buffers before it writes to disk
-    private int RECORD_INTERVAL = Integer.MAX_VALUE;
-    // the max size of memory for buffering records before writes them out
-    private int columnsBufferSize = 4 * 1024 * 1024; // 4M
-    // the conf string for COLUMNS_BUFFER_SIZE
-    public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
-
-    // how many records already buffered
-    private int bufferedRecords = 0;
-
-    private final ColumnBuffer[] columnBuffers;
-
-    private int columnNumber = 0;
-
-    private final int[] columnValuePlainLength;
-
-    KeyBuffer key = null;
-    private final int[] plainTotalColumnLength;
-    private final int[] comprTotalColumnLength;
-
-    boolean useNewMagic = true;
-
     /*
      * used for buffering appends before flush them out
      */
     class ColumnBuffer {
       // used for buffer a column's values
-      NonSyncDataOutputBuffer columnValBuffer;
+      NonSyncByteArrayOutputStream columnValBuffer;
       // used to store each value's length
-      NonSyncDataOutputBuffer valLenBuffer;
+      NonSyncByteArrayOutputStream valLenBuffer;
 
       /*
        * use a run-length encoding. We only record run length if a same
@@ -620,21 +625,25 @@ public class RCFile {
        * example, if the values' lengths are 1,1,1,2, we record 1, ~2,2. And for
        * value lengths 1,2,3 we record 1,2,3.
        */
+      int columnValueLength = 0;
+      int uncompressedColumnValueLength = 0;
+      int columnKeyLength = 0;
       int runLength = 0;
       int prevValueLength = -1;
 
       ColumnBuffer() throws IOException {
-        columnValBuffer = new NonSyncDataOutputBuffer();
-        valLenBuffer = new NonSyncDataOutputBuffer();
+        columnValBuffer = new NonSyncByteArrayOutputStream();
+        valLenBuffer = new NonSyncByteArrayOutputStream();
       }
 
-      public void append(BytesRefWritable data) throws IOException {
-        data.writeDataTo(columnValBuffer);
-        int currentLen = data.getLength();
+      public int append(Column column, Datum datum) throws IOException {
+        int currentLen = serde.serialize(column, datum, columnValBuffer, nullChars);
+        columnValueLength += currentLen;
+        uncompressedColumnValueLength += currentLen;
 
         if (prevValueLength < 0) {
           startNewGroup(currentLen);
-          return;
+          return currentLen;
         }
 
         if (currentLen != prevValueLength) {
@@ -643,6 +652,7 @@ public class RCFile {
         } else {
           runLength++;
         }
+        return currentLen;
       }
 
       private void startNewGroup(int currentLen) {
@@ -650,22 +660,39 @@ public class RCFile {
         runLength = 0;
       }
 
-      public void clear() throws IOException {
+      public void clear() {
         valLenBuffer.reset();
         columnValBuffer.reset();
         prevValueLength = -1;
         runLength = 0;
+        columnValueLength = 0;
+        columnKeyLength = 0;
+        uncompressedColumnValueLength = 0;
       }
 
-      public void flushGroup() throws IOException {
+      public int flushGroup() {
+        int len = 0;
         if (prevValueLength >= 0) {
-          WritableUtils.writeVLong(valLenBuffer, prevValueLength);
+          len += valLenBuffer.writeVLong(prevValueLength);
           if (runLength > 0) {
-            WritableUtils.writeVLong(valLenBuffer, ~runLength);
+            len += valLenBuffer.writeVLong(~runLength);
           }
+          columnKeyLength += len;
           runLength = -1;
           prevValueLength = -1;
         }
+        return len;
+      }
+
+      public int UnFlushedGroupSize() {
+        int len = 0;
+        if (prevValueLength >= 0) {
+          len += WritableUtils.getVIntSize(prevValueLength);
+          if (runLength > 0) {
+            len += WritableUtils.getVIntSize(~runLength);
+          }
+        }
+        return len;
       }
     }
 
@@ -673,98 +700,76 @@ public class RCFile {
       return out.getPos();
     }
 
-    /** Constructs a RCFile Appender. */
-    public Writer(FileSystem fs, Configuration conf, Path name) throws IOException {
-      this(fs, conf, name, null, new Metadata(), null);
-    }
+    public RCFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
+      super(conf, schema, meta, path);
 
-    /**
-     * Constructs a RCFile Appender.
-     *
-     * @param fs
-     *          the file system used
-     * @param conf
-     *          the configuration file
-     * @param name
-     *          the file name
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Configuration conf, Path name,
-                  Progressable progress, CompressionCodec codec) throws IOException {
-      this(fs, conf, name, progress, new Metadata(), codec);
+      this.conf = conf;
+      this.fs = path.getFileSystem(conf);
+      RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
+      COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR, COLUMNS_BUFFER_SIZE);
+      columnNumber = schema.getColumnNum();
     }
 
-    /**
-     * Constructs a RCFile Appender.
-     *
-     * @param fs
-     *          the file system used
-     * @param conf
-     *          the configuration file
-     * @param name
-     *          the file name
-     * @param progress a progress meter to update as the file is written
-     * @param metadata a string to string map in the file header
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Configuration conf, Path name,
-                  Progressable progress, Metadata metadata, CompressionCodec codec) throws IOException {
-      this(fs, conf, name, fs.getConf().getInt("io.file.buffer.size", 4096),
-          fs.getDefaultReplication(), fs.getDefaultBlockSize(), progress,
-          metadata, codec);
-    }
+    public void init() throws IOException {
+      fs = path.getFileSystem(conf);
 
-    /**
-     *
-     * Constructs a RCFile Appender.
-     *
-     * @param fs
-     *          the file system used
-     * @param conf
-     *          the configuration file
-     * @param name
-     *          the file name
-     * @param bufferSize the size of the file buffer
-     * @param replication the number of replicas for the file
-     * @param blockSize the block size of the file
-     * @param progress the progress meter for writing the file
-     * @param metadata a string to string map in the file header
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
-                  short replication, long blockSize, Progressable progress,
-                  Metadata metadata, CompressionCodec codec) throws IOException {
-      RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
-      columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
+      if (!fs.exists(path.getParent())) {
+        throw new FileNotFoundException(path.toString());
+      }
+
+      String codecClassname = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
+      if (!StringUtils.isEmpty(codecClassname)) {
+        try {
+          Class<? extends CompressionCodec> codecClass = conf.getClassByName(
+              codecClassname).asSubclass(CompressionCodec.class);
+          codec = ReflectionUtils.newInstance(codecClass, conf);
+        } catch (ClassNotFoundException cnfe) {
+          throw new IllegalArgumentException(
+              "Unknown codec: " + codecClassname, cnfe);
+        }
+      }
+
+      String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL));
+      if (StringUtils.isEmpty(nullCharacters)) {
+        nullChars = NullDatum.get().asTextBytes();
+      } else {
+        nullChars = nullCharacters.getBytes();
+      }
 
       if (metadata == null) {
         metadata = new Metadata();
       }
-      metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
-          + columnNumber));
 
-      columnsBufferSize = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR,
-          4 * 1024 * 1024);
+      metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text("" + columnNumber));
 
-      columnValuePlainLength = new int[columnNumber];
+      String serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+      try {
+        serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+      metadata.set(new Text(SERDE), new Text(serdeClass));
 
       columnBuffers = new ColumnBuffer[columnNumber];
       for (int i = 0; i < columnNumber; i++) {
         columnBuffers[i] = new ColumnBuffer();
       }
 
-      init(conf, fs.create(name, true, bufferSize, replication,
-          blockSize, progress), codec, metadata);
+      init(conf, fs.create(path, true, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata);
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
-      key = new KeyBuffer(columnNumber);
 
-      plainTotalColumnLength = new int[columnNumber];
-      comprTotalColumnLength = new int[columnNumber];
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+      super.init();
     }
 
-    /** Write the initial part of file header. */
+    /**
+     * Write the initial part of file header.
+     */
     void initializeFileHeader() throws IOException {
       if (useNewMagic) {
         out.write(MAGIC);
@@ -774,7 +779,9 @@ public class RCFile {
       }
     }
 
-    /** Write the final part of file header. */
+    /**
+     * Write the final part of file header.
+     */
     void finalizeFileHeader() throws IOException {
       out.write(sync); // write the sync bytes
       out.flush(); // flush header
@@ -784,13 +791,15 @@ public class RCFile {
       return codec != null;
     }
 
-    /** Write and flush the file header. */
+    /**
+     * Write and flush the file header.
+     */
     void writeFileHeader() throws IOException {
       if (useNewMagic) {
         out.writeBoolean(isCompressed());
       } else {
-        Text.writeString(out, KeyBuffer.class.getName());
-        Text.writeString(out, ValueBuffer.class.getName());
+        Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer");
+        Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer");
         out.writeBoolean(isCompressed());
         out.writeBoolean(false);
       }
@@ -807,18 +816,12 @@ public class RCFile {
       this.out = out;
       this.codec = codec;
       this.metadata = metadata;
-      this.useNewMagic =
-          conf.getBoolean(TajoConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
-    }
-
-    /** Returns the compression codec of data in this file. */
-    @SuppressWarnings("unused")
-    @Deprecated
-    public CompressionCodec getCompressionCodec() {
-      return codec;
+      this.useNewMagic = conf.getBoolean(TajoConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
     }
 
-    /** create a sync point. */
+    /**
+     * create a sync point.
+     */
     public void sync() throws IOException {
       if (sync != null && lastSyncPos != out.getPos()) {
         out.writeInt(SYNC_ESCAPE); // mark the start of the sync
@@ -827,13 +830,6 @@ public class RCFile {
       }
     }
 
-    /** Returns the configuration of this file. */
-    @SuppressWarnings("unused")
-    @Deprecated
-    Configuration getConf() {
-      return conf;
-    }
-
     private void checkAndWriteSync() throws IOException {
       if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
         sync();
@@ -842,115 +838,191 @@ public class RCFile {
 
     private int columnBufferSize = 0;
 
+    @Override
+    public long getOffset() throws IOException {
+      return out.getPos();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      flushRecords();
+      out.flush();
+    }
+
+    @Override
+    public void addTuple(Tuple t) throws IOException {
+      append(t);
+      // Statistical section
+
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
     /**
      * Append a row of values. Currently it only can accept <
-     * {@link BytesRefArrayWritable}. If its <code>size()</code> is less than the
+     * {@link Tuple}. If its <code>size()</code> is less than the
      * column number in the file, zero bytes are appended for the empty columns.
      * If its size() is greater then the column number in the file, the exceeded
      * columns' bytes are ignored.
      *
-     * @param val a BytesRefArrayWritable with the list of serialized columns
+     * @param tuple a Tuple with the list of serialized columns
      * @throws IOException
      */
-    public void append(Writable val) throws IOException {
+    public void append(Tuple tuple) throws IOException {
+      int size = schema.getColumnNum();
 
-      if (!(val instanceof BytesRefArrayWritable)) {
-        throw new UnsupportedOperationException(
-            "Currently the writer can only accept BytesRefArrayWritable");
-      }
-
-      BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
-      int size = columns.size();
       for (int i = 0; i < size; i++) {
-        BytesRefWritable cu = columns.get(i);
-        int plainLen = cu.getLength();
-        columnBufferSize += plainLen;
-        columnValuePlainLength[i] += plainLen;
-        columnBuffers[i].append(cu);
+        Datum datum = tuple.get(i);
+        int length = columnBuffers[i].append(schema.getColumn(i), datum);
+        columnBufferSize += length;
+        if (enabledStats) {
+          stats.analyzeField(i, datum);
+        }
       }
 
       if (size < columnNumber) {
-        for (int i = columns.size(); i < columnNumber; i++) {
-          columnBuffers[i].append(BytesRefWritable.ZeroBytesRefWritable);
+        for (int i = size; i < columnNumber; i++) {
+          columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
+          if (enabledStats) {
+            stats.analyzeField(i, NullDatum.get());
+          }
         }
       }
 
       bufferedRecords++;
-      if ((columnBufferSize > columnsBufferSize)
-          || (bufferedRecords >= RECORD_INTERVAL)) {
-        flushRecords();
+      if (this.isCompressed()) {
+        //TODO compression rate base flush
+        if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
+            || (bufferedRecords >= RECORD_INTERVAL)) {
+          flushRecords();
+        }
+      } else {
+        //TODO block base flush
+        if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
+            || (bufferedRecords >= RECORD_INTERVAL)) {
+          flushRecords();
+        }
       }
     }
 
-    private void flushRecords() throws IOException {
+    /**
+     * get number of bytes to store the keyBuffer.
+     *
+     * @return number of bytes used to store this KeyBuffer on disk
+     * @throws IOException
+     */
+    public int getKeyBufferSize() throws IOException {
+      int ret = 0;
+      ret += WritableUtils.getVIntSize(bufferedRecords);
+      for (int i = 0; i < columnBuffers.length; i++) {
+        ColumnBuffer currentBuf = columnBuffers[i];
+        ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
+        ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
+        ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
+        ret += currentBuf.columnKeyLength;
+      }
 
-      key.numberRows = bufferedRecords;
+      return ret;
+    }
+
+    /**
+     * get number of bytes to store the key part.
+     *
+     * @return number of bytes used to store this Key part on disk
+     * @throws IOException
+     */
+    public int getKeyPartSize() throws IOException {
+      int ret = 12; //12 bytes |record count, key length, compressed key length|
+
+      ret += WritableUtils.getVIntSize(bufferedRecords);
+      for (int i = 0; i < columnBuffers.length; i++) {
+        ColumnBuffer currentBuf = columnBuffers[i];
+        ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
+        ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
+        ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
+        ret += currentBuf.columnKeyLength;
+        ret += currentBuf.UnFlushedGroupSize();
+      }
+
+      return ret;
+    }
+
+    private void WriteKeyBuffer(DataOutputStream out) throws IOException {
+      WritableUtils.writeVLong(out, bufferedRecords);
+      for (int i = 0; i < columnBuffers.length; i++) {
+        ColumnBuffer currentBuf = columnBuffers[i];
+        WritableUtils.writeVLong(out, currentBuf.columnValueLength);
+        WritableUtils.writeVLong(out, currentBuf.uncompressedColumnValueLength);
+        WritableUtils.writeVLong(out, currentBuf.columnKeyLength);
+        currentBuf.valLenBuffer.writeTo(out);
+      }
+    }
+
+    private void flushRecords() throws IOException {
 
       Compressor compressor = null;
-      NonSyncDataOutputBuffer valueBuffer = null;
+      NonSyncByteArrayOutputStream valueBuffer = null;
       CompressionOutputStream deflateFilter = null;
       DataOutputStream deflateOut = null;
       boolean isCompressed = isCompressed();
+
       int valueLength = 0;
       if (isCompressed) {
-        ReflectionUtils.setConf(codec, this.conf);
         compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
-        valueBuffer = new NonSyncDataOutputBuffer();
+        if (compressor != null) compressor.reset();  //builtin gzip is null
+
+        valueBuffer = new NonSyncByteArrayOutputStream();
         deflateFilter = codec.createOutputStream(valueBuffer, compressor);
-        deflateOut = new DataOutputStream(deflateFilter);
+        deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
       }
 
       for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
         ColumnBuffer currentBuf = columnBuffers[columnIndex];
         currentBuf.flushGroup();
 
-        NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+        NonSyncByteArrayOutputStream columnValue = currentBuf.columnValBuffer;
         int colLen;
-        int plainLen = columnValuePlainLength[columnIndex];
-
+        int plainLen = columnValue.getLength();
         if (isCompressed) {
-          if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
-            ((SchemaAwareCompressionOutputStream)deflateFilter).
-                setColumnIndex(columnIndex);
-          }
           deflateFilter.resetState();
           deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
           deflateOut.flush();
           deflateFilter.finish();
+          columnValue.close();
           // find how much compressed data was added for this column
           colLen = valueBuffer.getLength() - valueLength;
+          currentBuf.columnValueLength = colLen;
         } else {
-          colLen = columnValuePlainLength[columnIndex];
+          colLen = plainLen;
         }
         valueLength += colLen;
-        key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
-            columnIndex);
-        plainTotalColumnLength[columnIndex] += plainLen;
-        comprTotalColumnLength[columnIndex] += colLen;
-        columnValuePlainLength[columnIndex] = 0;
       }
 
-      int keyLength = key.getSize();
+      int keyLength = getKeyBufferSize();
       if (keyLength < 0) {
-        throw new IOException("negative length keys not allowed: " + key);
+        throw new IOException("negative length keys not allowed: " + keyLength);
       }
       if (compressor != null) {
         org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
       }
 
       // Write the key out
-      writeKey(key, keyLength + valueLength, keyLength);
+      writeKey(keyLength + valueLength, keyLength);
       // write the value out
       if (isCompressed) {
         out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
+        valueBuffer.close();
       } else {
-        for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) {
-          NonSyncDataOutputBuffer buf =
-              columnBuffers[columnIndex].columnValBuffer;
-          out.write(buf.getData(), 0, buf.getLength());
+        for (int columnIndex = 0; columnIndex < columnNumber; ++columnIndex) {
+          columnBuffers[columnIndex].columnValBuffer.writeTo(out);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Column#" + columnIndex + " : Plain Total Column Value Length: "
+                + columnBuffers[columnIndex].uncompressedColumnValueLength
+                + ",  Compr Total Column Value Length: " + columnBuffers[columnIndex].columnValueLength);
+          }
         }
       }
-
       // clear the columnBuffers
       clearColumnBuffers();
 
@@ -958,41 +1030,34 @@ public class RCFile {
       columnBufferSize = 0;
     }
 
-    /**
-     * flush a block out without doing anything except compressing the key part.
-     */
-    public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
-                           int recordLen, int keyLength,
-                           @SuppressWarnings("unused") int compressedKeyLen) throws IOException {
-      writeKey(keyBuffer, recordLen, keyLength);
-      valueBuffer.write(out);
-    }
-
-    private void writeKey(KeyBuffer keyBuffer, int recordLen,
-                          int keyLength) throws IOException {
+    private void writeKey(int recordLen, int keyLength) throws IOException {
       checkAndWriteSync(); // sync
       out.writeInt(recordLen); // total record length
       out.writeInt(keyLength); // key portion length
 
-      if(this.isCompressed()) {
+      if (this.isCompressed()) {
         Compressor compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
-        NonSyncDataOutputBuffer compressionBuffer =
-            new NonSyncDataOutputBuffer();
-        CompressionOutputStream deflateFilter =
-            codec.createOutputStream(compressionBuffer, compressor);
-        DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
+        if (compressor != null) compressor.reset();  //builtin gzip is null
+
+        NonSyncByteArrayOutputStream compressionBuffer = new NonSyncByteArrayOutputStream();
+        CompressionOutputStream deflateFilter = codec.createOutputStream(compressionBuffer, compressor);
+        DataOutputStream deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+
         //compress key and write key out
         compressionBuffer.reset();
         deflateFilter.resetState();
-        keyBuffer.write(deflateOut);
+        WriteKeyBuffer(deflateOut);
         deflateOut.flush();
         deflateFilter.finish();
         int compressedKeyLen = compressionBuffer.getLength();
         out.writeInt(compressedKeyLen);
-        out.write(compressionBuffer.getData(), 0, compressedKeyLen);
+        compressionBuffer.writeTo(out);
+        compressionBuffer.reset();
+        deflateOut.close();
+        org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
       } else {
         out.writeInt(keyLength);
-        keyBuffer.write(out);
+        WriteKeyBuffer(out);
       }
     }
 
@@ -1002,6 +1067,16 @@ public class RCFile {
       }
     }
 
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+
+    @Override
     public synchronized void close() throws IOException {
       if (bufferedRecords > 0) {
         flushRecords();
@@ -1009,33 +1084,27 @@ public class RCFile {
       clearColumnBuffers();
 
       if (out != null) {
-
         // Close the underlying stream if we own it...
         out.flush();
         out.close();
         out = null;
       }
-      for (int i = 0; i < columnNumber; i++) {
-        LOG.info("Column#" + i + " : Plain Total Column Value Length: "
-            + plainTotalColumnLength[i]
-            + ",  Compr Total Column Value Length: " + comprTotalColumnLength[i]);
-      }
     }
   }
 
   /**
    * Read KeyBuffer/ValueBuffer pairs from a RCFile.
-   *
    */
-  public static class Reader {
+  public static class RCFileScanner extends FileScanner {
     private static class SelectedColumn {
       public int colIndex;
       public int rowReadIndex;
       public int runLength;
       public int prvLength;
+      public boolean isNulled;
     }
-    private final Path file;
-    private final FSDataInputStream in;
+
+    private FSDataInputStream in;
 
     private byte version;
 
@@ -1048,13 +1117,16 @@ public class RCFile {
     private long lastSeenSyncPos = 0;
 
     private long headerEnd;
-    private final long end;
+    private long start, end;
+    private long startOffset, endOffset;
+    private int[] targetColumnIndexes;
+
     private int currentKeyLength;
     private int currentRecordLength;
 
     private final Configuration conf;
 
-    private final ValueBuffer currentValue;
+    private ValueBuffer currentValue;
 
     private int readRowsIndexInBuffer = 0;
 
@@ -1062,52 +1134,76 @@ public class RCFile {
 
     private int columnNumber = 0;
 
-    private int loadColumnNum;
+    private boolean more = true;
 
     private int passedRowsNum = 0;
 
-    // Should we try to tolerate corruption? Default is No.
-    private boolean tolerateCorruptions = false;
-
     private boolean decompress = false;
 
     private Decompressor keyDecompressor;
-    NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
+
 
     //Current state of each selected column - e.g. current run length, etc.
     // The size of the array is equal to the number of selected columns
-    private final SelectedColumn[] selectedColumns;
-
-    // map of original column id -> index among selected columns
-    private final int[] revPrjColIDs;
+    private SelectedColumn[] selectedColumns;
 
     // column value lengths for each of the selected columns
-    private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
+    private NonSyncDataInputBuffer[] colValLenBufferReadIn;
 
-    /** Create a new RCFile reader. */
-    public Reader(FileSystem fs, Path file, Configuration conf) throws IOException {
-      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, 0, fs
-          .getFileStatus(file).getLen());
-    }
+    private LongWritable rowId;
+    private byte[] nullChars;
+    private SerializeDeserialize serde;
+
+    public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
+                         final FileFragment fragment) throws IOException {
+      super(conf, schema, meta, fragment);
+
+      rowId = new LongWritable();
+      conf.setInt("io.file.buffer.size", 4096); //TODO remove
 
-    /** Create a new RCFile reader. */
-    public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
-                  long start, long length) throws IOException {
-      tolerateCorruptions = conf.getBoolean(
-          TOLERATE_CORRUPTIONS_CONF_STR, false);
-      conf.setInt("io.file.buffer.size", bufferSize);
-      this.file = file;
-      in = openFile(fs, file, bufferSize, length);
+
+      startOffset = fragment.getStartKey();
+      endOffset = startOffset + fragment.getEndKey();
       this.conf = conf;
-      end = start + length;
+      more = startOffset < endOffset;
+      start = 0;
+    }
+
+    @Override
+    public void init() throws IOException {
+      String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL));
+      if (StringUtils.isEmpty(nullCharacters)) {
+        nullChars = NullDatum.get().asTextBytes();
+      } else {
+        nullChars = nullCharacters.getBytes();
+      }
+
+      // projection
+      if (targets == null) {
+        targets = schema.toArray();
+      }
+
+      targetColumnIndexes = new int[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+      }
+      Arrays.sort(targetColumnIndexes);
+
+      FileSystem fs = fragment.getPath().getFileSystem(conf);
+      end = fs.getFileStatus(fragment.getPath()).getLen();
+      in = openFile(fs, fragment.getPath(), 4096);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RCFile open:" + fragment.getPath() + "," + start + "," + (endOffset - startOffset) +
+            "," + fs.getFileStatus(fragment.getPath()).getLen());
+      }
+      //init RCFILE Header
       boolean succeed = false;
       try {
         if (start > 0) {
           seek(0);
-          init();
-          seek(start);
+          initHeader();
         } else {
-          init();
+          initHeader();
         }
         succeed = true;
       } finally {
@@ -1115,7 +1211,7 @@ public class RCFile {
           if (in != null) {
             try {
               in.close();
-            } catch(IOException e) {
+            } catch (IOException e) {
               if (LOG != null && LOG.isDebugEnabled()) {
                 LOG.debug("Exception in closing " + in, e);
               }
@@ -1124,64 +1220,34 @@ public class RCFile {
         }
       }
 
-      columnNumber = Integer.parseInt(metadata.get(
-          new Text(COLUMN_NUMBER_METADATA_STR)).toString());
-
-      java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
-          .getReadColumnIDs(conf);
+      columnNumber = Integer.parseInt(metadata.get(new Text(COLUMN_NUMBER_METADATA_STR)).toString());
+      selectedColumns = new SelectedColumn[targetColumnIndexes.length];
+      colValLenBufferReadIn = new NonSyncDataInputBuffer[targetColumnIndexes.length];
       boolean[] skippedColIDs = new boolean[columnNumber];
-      if (notSkipIDs.size() > 0) {
-        for (int i = 0; i < skippedColIDs.length; i++) {
-          skippedColIDs[i] = true;
-        }
-        for (int read : notSkipIDs) {
-          if (read < columnNumber) {
-            skippedColIDs[read] = false;
-          }
-        }
-      } else {
-        // TODO: if no column name is specified e.g, in select count(1) from tt;
-        // skip all columns, this should be distinguished from the case:
-        // select * from tt;
-        for (int i = 0; i < skippedColIDs.length; i++) {
-          skippedColIDs[i] = false;
-        }
-      }
-
-      loadColumnNum = columnNumber;
-      if (skippedColIDs.length > 0) {
-        for (boolean skippedColID : skippedColIDs) {
-          if (skippedColID) {
-            loadColumnNum -= 1;
-          }
-        }
-      }
+      Arrays.fill(skippedColIDs, true);
+      super.init();
 
+      for (int i = 0; i < targetColumnIndexes.length; i++) {
+        int tid = targetColumnIndexes[i];
+        if (tid < columnNumber) {
+          skippedColIDs[tid] = false;
 
-      revPrjColIDs = new int[columnNumber];
-      // get list of selected column IDs
-      selectedColumns = new SelectedColumn[loadColumnNum];
-      colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
-      for (int i = 0, j = 0; i < columnNumber; ++i) {
-        if (!skippedColIDs[i]) {
           SelectedColumn col = new SelectedColumn();
-          col.colIndex = i;
+          col.colIndex = tid;
           col.runLength = 0;
           col.prvLength = -1;
           col.rowReadIndex = 0;
-          selectedColumns[j] = col;
-          colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
-          revPrjColIDs[i] = j;
-          j++;
-        } else {
-          revPrjColIDs[i] = -1;
+          selectedColumns[i] = col;
+          colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
         }
       }
 
       currentKey = createKeyBuffer();
-      boolean lazyDecompress = !tolerateCorruptions;
-      currentValue = new ValueBuffer(
-          null, columnNumber, skippedColIDs, codec, lazyDecompress);
+      currentValue = new ValueBuffer(null, columnNumber, targetColumnIndexes, codec, skippedColIDs);
+
+      if (startOffset > getPosition()) {    // TODO use sync cache
+        sync(startOffset); // sync to start
+      }
     }
 
     /**
@@ -1194,6 +1260,7 @@ public class RCFile {
 
     /**
      * Return the metadata value associated with the given key.
+     *
      * @param key the metadata key to retrieve
      */
     public Text getMetadataValueOf(Text key) {
@@ -1204,25 +1271,24 @@ public class RCFile {
      * Override this method to specialize the type of
      * {@link FSDataInputStream} returned.
      */
-    protected FSDataInputStream openFile(FileSystem fs, Path file,
-                                         int bufferSize, long length) throws IOException {
+    protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize) throws IOException {
       return fs.open(file, bufferSize);
     }
 
-    private void init() throws IOException {
+    private void initHeader() throws IOException {
       byte[] magic = new byte[MAGIC.length];
       in.readFully(magic);
 
       if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
         byte vers = in.readByte();
         if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
-          throw new IOException(file + " is a version " + vers +
+          throw new IOException(fragment.getPath() + " is a version " + vers +
               " SequenceFile instead of an RCFile.");
         }
         version = ORIGINAL_VERSION;
       } else {
         if (!Arrays.equals(magic, MAGIC)) {
-          throw new IOException(file + " not a RCFile and has magic of " +
+          throw new IOException(fragment.getPath() + " not a RCFile and has magic of " +
               new String(magic));
         }
 
@@ -1239,10 +1305,10 @@ public class RCFile {
           Class<?> valCls = conf.getClassByName(Text.readString(in));
           if (!keyCls.equals(KeyBuffer.class)
               || !valCls.equals(ValueBuffer.class)) {
-            throw new IOException(file + " not a RCFile");
+            throw new IOException(fragment.getPath() + " not a RCFile");
           }
         } catch (ClassNotFoundException e) {
-          throw new IOException(file + " not a RCFile", e);
+          throw new IOException(fragment.getPath() + " not a RCFile", e);
         }
       }
 
@@ -1252,7 +1318,7 @@ public class RCFile {
         // is block-compressed? it should be always false.
         boolean blkCompressed = in.readBoolean();
         if (blkCompressed) {
-          throw new IOException(file + " not a RCFile.");
+          throw new IOException(fragment.getPath() + " not a RCFile.");
         }
       }
 
@@ -1267,30 +1333,48 @@ public class RCFile {
           throw new IllegalArgumentException(
               "Unknown codec: " + codecClassname, cnfe);
         }
+
         keyDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
       }
 
       metadata = new Metadata();
       metadata.readFields(in);
 
+      Text text = metadata.get(new Text(SERDE));
+
+      try {
+        String serdeClass;
+        if(text != null && !text.toString().isEmpty()){
+          serdeClass = text.toString();
+        } else{
+          serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+        }
+        serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+
       in.readFully(sync); // read sync bytes
       headerEnd = in.getPos();
     }
 
-    /** Return the current byte position in the input file. */
+    /**
+     * Return the current byte position in the input file.
+     */
     public synchronized long getPosition() throws IOException {
       return in.getPos();
     }
 
     /**
      * Set the current byte position in the input file.
-     *
-     * <p>
+     * <p/>
+     * <p/>
      * The position passed must be a position returned by
-     * {@link RCFile.Writer#getLength()} when writing this file. To seek to an
-     * arbitrary position, use {@link RCFile.Reader#sync(long)}. In another
+     * {@link RCFile.RCFileAppender#getLength()} when writing this file. To seek to an
+     * arbitrary position, use {@link RCFile.RCFileScanner#sync(long)}. In another
      * words, the current seek can only seek to the end of the file. For other
-     * positions, use {@link RCFile.Reader#sync(long)}.
+     * positions, use {@link RCFile.RCFileScanner#sync(long)}.
      */
     public synchronized void seek(long position) throws IOException {
       in.seek(position);
@@ -1298,7 +1382,7 @@ public class RCFile {
 
     /**
      * Resets the values which determine if there are more rows in the buffer
-     *
+     * <p/>
      * This can be used after one calls seek or sync, if one called next before that.
      * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
      * buffer built up from the call to next.
@@ -1308,7 +1392,9 @@ public class RCFile {
       recordsNumInValBuffer = 0;
     }
 
-    /** Seek to the next sync mark past a given position. */
+    /**
+     * Seek to the next sync mark past a given position.
+     */
     public synchronized void sync(long position) throws IOException {
       if (position + SYNC_SIZE >= end) {
         seek(end);
@@ -1326,21 +1412,31 @@ public class RCFile {
 
       try {
         seek(position + 4); // skip escape
-        in.readFully(syncCheck);
-        int syncLen = sync.length;
-        for (int i = 0; in.getPos() < end; i++) {
-          int j = 0;
-          for (; j < syncLen; j++) {
-            if (sync[j] != syncCheck[(i + j) % syncLen]) {
-              break;
+
+        int prefix = sync.length;
+        int n = conf.getInt("io.bytes.per.checksum", 512);
+        byte[] buffer = new byte[prefix + n];
+        n = (int) Math.min(n, end - in.getPos());
+        /* fill array with a pattern that will never match sync */
+        Arrays.fill(buffer, (byte) (~sync[0]));
+        while (n > 0 && (in.getPos() + n) <= end) {
+          position = in.getPos();
+          in.readFully(buffer, prefix, n);
+          /* the buffer has n+sync bytes */
+          for (int i = 0; i < n; i++) {
+            int j;
+            for (j = 0; j < sync.length && sync[j] == buffer[i + j]; j++) {
+              /* nothing */
+            }
+            if (j == sync.length) {
+              /* simplified from (position + (i - prefix) + sync.length) - SYNC_SIZE */
+              in.seek(position + i - SYNC_SIZE);
+              return;
             }
           }
-          if (j == syncLen) {
-            in.seek(in.getPos() - SYNC_SIZE); // position before
-            // sync
-            return;
-          }
-          syncCheck[i % syncLen] = in.readByte();
+          /* move the last 16 bytes to the prefix area */
+          System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
+          n = (int) Math.min(n, end - in.getPos());
         }
       } catch (ChecksumException e) { // checksum failure
         handleChecksumException(e);
@@ -1403,7 +1499,6 @@ public class RCFile {
     private int compressedKeyLen = 0;
     NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
     NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
-    NonSyncDataOutputBuffer keyTempBuffer = new NonSyncDataOutputBuffer();
 
     KeyBuffer currentKey = null;
     boolean keyInit = false;
@@ -1418,17 +1513,31 @@ public class RCFile {
       currentKeyLength = in.readInt();
       compressedKeyLen = in.readInt();
       if (decompress) {
-        keyTempBuffer.reset();
-        keyTempBuffer.write(in, compressedKeyLen);
-        keyDecompressBuffer.reset(keyTempBuffer.getData(), compressedKeyLen);
-        CompressionInputStream deflatFilter = codec.createInputStream(
-            keyDecompressBuffer, keyDecompressor);
-        DataInputStream compressedIn = new DataInputStream(deflatFilter);
-        deflatFilter.resetState();
-        keyDecompressedData.reset();
-        keyDecompressedData.write(compressedIn, currentKeyLength);
-        keyDataIn.reset(keyDecompressedData.getData(), currentKeyLength);
+
+        byte[] compressedBytes = new byte[compressedKeyLen];
+        in.readFully(compressedBytes, 0, compressedKeyLen);
+
+        if (keyDecompressor != null) keyDecompressor.reset();
+        keyDecompressBuffer.reset(compressedBytes, compressedKeyLen);
+
+        DataInputStream is;
+        if (codec instanceof SplittableCompressionCodec) {
+          SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
+              keyDecompressBuffer, keyDecompressor, 0, compressedKeyLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+          keyDecompressBuffer.seek(deflatFilter.getAdjustedStart());
+          is = new DataInputStream(deflatFilter);
+        } else {
+          CompressionInputStream deflatFilter = codec.createInputStream(keyDecompressBuffer, keyDecompressor);
+          is = new DataInputStream(deflatFilter);
+        }
+
+        byte[] deCompressedBytes = new byte[currentKeyLength];
+
+        is.readFully(deCompressedBytes, 0, currentKeyLength);
+        keyDataIn.reset(deCompressedBytes, currentKeyLength);
         currentKey.readFields(keyDataIn);
+        is.close();
       } else {
         currentKey.readFields(in);
       }
@@ -1441,12 +1550,20 @@ public class RCFile {
 
       for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
         SelectedColumn col = selectedColumns[selIx];
+        if (col == null) {
+          col = new SelectedColumn();
+          col.isNulled = true;
+          selectedColumns[selIx] = col;
+          continue;
+        }
+
         int colIx = col.colIndex;
-        NonSyncDataOutputBuffer buf = currentKey.allCellValLenBuffer[colIx];
+        NonSyncByteArrayOutputStream buf = currentKey.allCellValLenBuffer[colIx];
         colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
         col.rowReadIndex = 0;
         col.runLength = 0;
         col.prvLength = -1;
+        col.isNulled = buf.getLength() == 0;
       }
 
       return currentKeyLength;
@@ -1462,106 +1579,41 @@ public class RCFile {
       currentValue.inited = true;
     }
 
-    public boolean nextBlock() throws IOException {
-      int keyLength = nextKeyBuffer();
-      if(keyLength > 0) {
-        currentValueBuffer();
-        return true;
-      }
-      return false;
-    }
-
     private boolean rowFetched = false;
 
-    // use this buffer to hold column's cells value length for usages in
-    // getColumn(), instead of using colValLenBufferReadIn directly.
-    private final NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
-
-    /**
-     * Fetch all data in the buffer for a given column. This is useful for
-     * columnar operators, which perform operations on an array data of one
-     * column. It should be used together with {@link #nextColumnsBatch()}.
-     * Calling getColumn() with not change the result of
-     * {@link #next(LongWritable)} and
-     * {@link #getCurrentRow(BytesRefArrayWritable)}.
-     *
-     * @param columnID the number of the column to get 0 to N-1
-     * @throws IOException
-     */
-    public BytesRefArrayWritable getColumn(int columnID,
-                                           BytesRefArrayWritable rest) throws IOException {
-      int selColIdx = revPrjColIDs[columnID];
-      if (selColIdx == -1) {
+    @Override
+    public Tuple next() throws IOException {
+      if (!more) {
         return null;
       }
 
-      if (rest == null) {
-        rest = new BytesRefArrayWritable();
-      }
-
-      rest.resetValid(recordsNumInValBuffer);
-
-      if (!currentValue.inited) {
-        currentValueBuffer();
-      }
-
-      int columnNextRowStart = 0;
-      fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
-          .getData(), currentKey.allCellValLenBuffer[columnID].getLength());
-      SelectedColumn selCol = selectedColumns[selColIdx];
-      byte[] uncompData = null;
-      ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
-      boolean decompressed = currentValue.decompressedFlag[selColIdx];
-      if (decompressed) {
-        uncompData =
-            currentValue.loadedColumnsValueBuffer[selColIdx].getData();
-      } else {
-        decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
+      more = nextBuffer(rowId);
+      long lastSeenSyncPos = lastSeenSyncPos();
+      if (lastSeenSyncPos >= endOffset) {
+        more = false;
+        return null;
       }
-      for (int i = 0; i < recordsNumInValBuffer; i++) {
-        colAdvanceRow(selColIdx, selCol);
-        int length = selCol.prvLength;
-
-        BytesRefWritable currentCell = rest.get(i);
 
-        if (decompressed) {
-          currentCell.set(uncompData, columnNextRowStart, length);
-        } else {
-          currentCell.set(decompCallBack, columnNextRowStart, length);
-        }
-        columnNextRowStart = columnNextRowStart + length;
+      if (!more) {
+        return null;
       }
-      return rest;
-    }
 
-    /**
-     * Read in next key buffer and throw any data in current key buffer and
-     * current value buffer. It will influence the result of
-     * {@link #next(LongWritable)} and
-     * {@link #getCurrentRow(BytesRefArrayWritable)}
-     *
-     * @return whether there still has records or not
-     * @throws IOException
-     */
-    @SuppressWarnings("unused")
-    @Deprecated
-    public synchronized boolean nextColumnsBatch() throws IOException {
-      passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
-      return nextKeyBuffer() > 0;
+      Tuple tuple = new VTuple(schema.getColumnNum());
+      getCurrentRow(tuple);
+      return tuple;
     }
 
     /**
-     * Returns how many rows we fetched with next(). It only means how many rows
-     * are read by next(). The returned result may be smaller than actual number
-     * of rows passed by, because {@link #seek(long)},
-     * {@link #nextColumnsBatch()} can change the underlying key buffer and
+     * Returns how many rows we fetched with nextBuffer(). It only means how many rows
+     * are read by nextBuffer(). The returned result may be smaller than actual number
+     * of rows passed by, because {@link #seek(long)} can change the underlying key buffer and
      * value buffer.
      *
      * @return next row number
      * @throws IOException
      */
-    public synchronized boolean next(LongWritable readRows) throws IOException {
-      if (hasRecordsInBuffer()) {
+    public synchronized boolean nextBuffer(LongWritable readRows) throws IOException {
+      if (readRowsIndexInBuffer < recordsNumInValBuffer) {
         readRows.set(passedRowsNum);
         readRowsIndexInBuffer++;
         passedRowsNum++;
@@ -1572,109 +1624,41 @@ public class RCFile {
       }
 
       int ret = -1;
-      if (tolerateCorruptions) {
-        ret = nextKeyValueTolerateCorruptions();
-      } else {
-        try {
-          ret = nextKeyBuffer();
-        } catch (EOFException eof) {
-          eof.printStackTrace();
-        }
-      }
-      return (ret > 0) && next(readRows);
-    }
-
-    private int nextKeyValueTolerateCorruptions() throws IOException {
-      long currentOffset = in.getPos();
-      int ret;
       try {
         ret = nextKeyBuffer();
-        this.currentValueBuffer();
-      } catch (IOException ioe) {
-        // A BlockMissingException indicates a temporary error,
-        // not a corruption. Re-throw this exception.
-        String msg = ioe.getMessage();
-        if (msg != null && msg.startsWith(BLOCK_MISSING_MESSAGE)) {
-          LOG.warn("Re-throwing block-missing exception" + ioe);
-          throw ioe;
-        }
-        // We have an IOException other than a BlockMissingException.
-        LOG.warn("Ignoring IOException in file " + file +
-            " after offset " + currentOffset, ioe);
-        ret = -1;
-      } catch (Throwable t) {
-        // We got an exception that is not IOException
-        // (typically OOM, IndexOutOfBounds, InternalError).
-        // This is most likely a corruption.
-        LOG.warn("Ignoring unknown error in " + file +
-            " after offset " + currentOffset, t);
-        ret = -1;
+      } catch (EOFException eof) {
+        eof.printStackTrace();
       }
-      return ret;
-    }
-
-    public boolean hasRecordsInBuffer() {
-      return readRowsIndexInBuffer < recordsNumInValBuffer;
+      return (ret > 0) && nextBuffer(readRows);
     }
 
     /**
-     * get the current row used,make sure called {@link #next(LongWritable)}
+     * get the current row used,make sure called {@link #next()}
      * first.
      *
      * @throws IOException
      */
-    public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
-
+    public void getCurrentRow(Tuple tuple) throws IOException {
       if (!keyInit || rowFetched) {
         return;
       }
 
-      if (tolerateCorruptions) {
-        if (!currentValue.inited) {
-          currentValueBuffer();
-        }
-        ret.resetValid(columnNumber);
-      } else {
-        if (!currentValue.inited) {
-          currentValueBuffer();
-          // do this only when not initialized, but we may need to find a way to
-          // tell the caller how to initialize the valid size
-          ret.resetValid(columnNumber);
-        }
+      if (!currentValue.inited) {
+        currentValueBuffer();
       }
 
-      // we do not use BytesWritable here to avoid the byte-copy from
-      // DataOutputStream to BytesWritable
-      if (currentValue.numCompressed > 0) {
-        for (int j = 0; j < selectedColumns.length; ++j) {
-          SelectedColumn col = selectedColumns[j];
-          int i = col.colIndex;
-
-          BytesRefWritable ref = ret.unCheckedGet(i);
+      for (int j = 0; j < selectedColumns.length; ++j) {
+        SelectedColumn col = selectedColumns[j];
+        int i = col.colIndex;
 
+        if (col.isNulled) {
+          tuple.put(i, NullDatum.get());
+        } else {
           colAdvanceRow(j, col);
 
-          if (currentValue.decompressedFlag[j]) {
-            ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
-                col.rowReadIndex, col.prvLength);
-          } else {
-            ref.set(currentValue.lazyDecompressCallbackObjs[j],
-                col.rowReadIndex, col.prvLength);
-          }
-          col.rowReadIndex += col.prvLength;
-        }
-      } else {
-        // This version of the loop eliminates a condition check and branch
-        // and is measurably faster (20% or so)
-        for (int j = 0; j < selectedColumns.length; ++j) {
-          SelectedColumn col = selectedColumns[j];
-          int i = col.colIndex;
-
-          BytesRefWritable ref = ret.unCheckedGet(i);
-
-          colAdvanceRow(j, col);
-          ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
-              col.rowReadIndex, col.prvLength);
+          Datum datum = serde.deserialize(schema.getColumn(i),
+              currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars);
+          tuple.put(i, datum);
           col.rowReadIndex += col.prvLength;
         }
       }
@@ -1683,9 +1667,10 @@ public class RCFile {
 
     /**
      * Advance column state to the next now: update offsets, run lengths etc
+     *
      * @param selCol - index among selectedColumns
-     * @param col - column object to update the state of.  prvLength will be
-     *        set to the new read position
+     * @param col    - column object to update the state of.  prvLength will be
+     *               set to the new read position
      * @throws IOException
      */
     private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException {
@@ -1704,75 +1689,57 @@ public class RCFile {
       }
     }
 
-    /** Returns true iff the previous call to next passed a sync mark. */
-    @SuppressWarnings("unused")
+    /**
+     * Returns true if the previous call to next passed a sync mark.
+     */
     public boolean syncSeen() {
       return syncSeen;
     }
 
-    /** Returns the last seen sync position. */
+    /**
+     * Returns the last seen sync position.
+     */
     public long lastSeenSyncPos() {
       return lastSeenSyncPos;
     }
 
-    /** Returns the name of the file. */
-    @Override
-    public String toString() {
-      return file.toString();
-    }
-
-    @SuppressWarnings("unused")
-    public boolean isCompressedRCFile() {
-      return this.decompress;
-    }
-
-    /** Close the reader. */
-    public void close() {
-      IOUtils.closeStream(in);
-      currentValue.close();
-      if (decompress) {
-        IOUtils.closeStream(keyDecompressedData);
-        org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);
-      }
-    }
-
     /**
-     * return the KeyBuffer object used in the reader. Internally in each
-     * reader, there is only one KeyBuffer object, which gets reused for every
-     * block.
+     * Returns the name of the file.
      */
-    public KeyBuffer getCurrentKeyBufferObj() {
-      return this.currentKey;
+    @Override
+    public String toString() {
+      return fragment.getPath().toString();
     }
 
-    /**
-     * return the ValueBuffer object used in the reader. Internally in each
-     * reader, there is only one ValueBuffer object, which gets reused for every
-     * block.
-     */
-    public ValueBuffer getCurrentValueBufferObj() {
-      return this.currentValue;
+    @Override
+    public void reset() throws IOException {
+      seek(startOffset);
     }
 
-    //return the current block's length
-    public int getCurrentBlockLength() {
-      return this.currentRecordLength;
+    @Override
+    public boolean isProjectable() {
+      return true;
     }
 
-    //return the current block's key length
-    public int getCurrentKeyLength() {
-      return this.currentKeyLength;
+    @Override
+    public boolean isSelectable() {
+      return false;
     }
 
-    //return the current block's compressed key length
-    public int getCurrentCompressedKeyLen() {
-      return this.compressedKeyLen;
+    @Override
+    public boolean isSplittable() {
+      return true;
     }
 
-    //return the CompressionCodec used for this file
-    public CompressionCodec getCompressionCodec() {
-      return this.codec;
+    @Override
+    public void close() throws IOException {
+      IOUtils.closeStream(in);
+      currentValue.close();
+      if (keyDecompressor != null) {
+        // Make sure we only return decompressor once.
+        org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);
+        keyDecompressor = null;
+      }
     }
-
   }
 }