You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:55 UTC

[21/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
deleted file mode 100644
index e5507ad..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ /dev/null
@@ -1,1805 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.Metadata;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.Closeable;
-import java.io.*;
-import java.rmi.server.UID;
-import java.security.MessageDigest;
-import java.util.Arrays;
-
-/**
- * <code>RCFile</code>s, short of Record Columnar File, are flat files
- * consisting of binary key/value pairs, which shares much similarity with
- * <code>SequenceFile</code>.
- * <p/>
- * RCFile stores columns of a table in a record columnar way. It first
- * partitions rows horizontally into row splits. and then it vertically
- * partitions each row split in a columnar way. RCFile first stores the meta
- * data of a row split, as the key part of a record, and all the data of a row
- * split as the value part. When writing, RCFile.Writer first holds records'
- * value bytes in memory, and determines a row split if the raw bytes size of
- * buffered records overflow a given parameter<tt>Writer.columnsBufferSize</tt>,
- * which can be set like: <code>conf.setInt(COLUMNS_BUFFER_SIZE_CONF_STR,
- * 4 * 1024 * 1024)</code> .
- * <p>
- * <code>RCFile</code> provides {@link Writer}, {@link Reader} and classes for
- * writing, reading respectively.
- * </p>
- * <p/>
- * <p>
- * RCFile stores columns of a table in a record columnar way. It first
- * partitions rows horizontally into row splits. and then it vertically
- * partitions each row split in a columnar way. RCFile first stores the meta
- * data of a row split, as the key part of a record, and all the data of a row
- * split as the value part.
- * </p>
- * <p/>
- * <p>
- * RCFile compresses values in a more fine-grained manner then record level
- * compression. However, It currently does not support compress the key part
- * yet. The actual compression algorithm used to compress key and/or values can
- * be specified by using the appropriate {@link CompressionCodec}.
- * </p>
- * <p/>
- * <p>
- * The {@link Reader} is used to read and explain the bytes of RCFile.
- * </p>
- * <p/>
- * <h4 id="Formats">RCFile Formats</h4>
- * <p/>
- * <p/>
- * <h5 id="Header">RC Header</h5>
- * <ul>
- * <li>version - 3 bytes of magic header <b>RCF</b>, followed by 1 byte of
- * actual version number (e.g. RCF1)</li>
- * <li>compression - A boolean which specifies if compression is turned on for
- * keys/values in this file.</li>
- * <li>compression codec - <code>CompressionCodec</code> class which is used
- * for compression of keys and/or values (if compression is enabled).</li>
- * <li>metadata - {@link Metadata} for this file.</li>
- * <li>sync - A sync marker to denote end of the header.</li>
- * </ul>
- * <p/>
- * <h5>RCFile Format</h5>
- * <ul>
- * <li><a href="#Header">Header</a></li>
- * <li>Record
- * <li>Key part
- * <ul>
- * <li>Record length in bytes</li>
- * <li>Key length in bytes</li>
- * <li>Number_of_rows_in_this_record(vint)</li>
- * <li>Column_1_ondisk_length(vint)</li>
- * <li>Column_1_row_1_value_plain_length</li>
- * <li>Column_1_row_2_value_plain_length</li>
- * <li>...</li>
- * <li>Column_2_ondisk_length(vint)</li>
- * <li>Column_2_row_1_value_plain_length</li>
- * <li>Column_2_row_2_value_plain_length</li>
- * <li>...</li>
- * </ul>
- * </li>
- * </li>
- * <li>Value part
- * <ul>
- * <li>Compressed or plain data of [column_1_row_1_value,
- * column_1_row_2_value,....]</li>
- * <li>Compressed or plain data of [column_2_row_1_value,
- * column_2_row_2_value,....]</li>
- * </ul>
- * </li>
- * </ul>
- * <p>
- * <pre>
- * {@code
- * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
- * with dashes:
- *
- * rcfile ::=
- *   <file-header>
- *   <rcfile-rowgroup>+
- *
- * file-header ::=
- *   <file-version-header>
- *   <file-key-class-name>              (only exists if version is seq6)
- *   <file-value-class-name>            (only exists if version is seq6)
- *   <file-is-compressed>
- *   <file-is-block-compressed>         (only exists if version is seq6)
- *   [<file-compression-codec-class>]
- *   <file-header-metadata>
- *   <file-sync-field>
- *
- * -- The normative RCFile implementation included with Hive is actually
- * -- based on a modified version of Hadoop's SequenceFile code. Some
- * -- things which should have been modified were not, including the code
- * -- that writes out the file version header. Consequently, RCFile and
- * -- SequenceFile originally shared the same version header.  A newer
- * -- release has created a unique version string.
- *
- * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
- *                     |   Byte[4] {'R', 'C', 'F', 1}
- *
- * -- The name of the Java class responsible for reading the key buffer
- * -- component of the rowgroup.
- *
- * file-key-class-name ::=
- *   Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
- *
- * -- The name of the Java class responsible for reading the value buffer
- * -- component of the rowgroup.
- *
- * file-value-class-name ::=
- *   Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
- *
- * -- Boolean variable indicating whether or not the file uses compression
- * -- for the key and column buffer sections.
- *
- * file-is-compressed ::= Byte[1]
- *
- * -- A boolean field indicating whether or not the file is block compressed.
- * -- This field is *always* false. According to comments in the original
- * -- RCFile implementation this field was retained for backwards
- * -- compatability with the SequenceFile format.
- *
- * file-is-block-compressed ::= Byte[1] {false}
- *
- * -- The Java class name of the compression codec iff <file-is-compressed>
- * -- is true. The named class must implement
- * -- org.apache.hadoop.io.compress.CompressionCodec.
- * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
- *
- * file-compression-codec-class ::= Text
- *
- * -- A collection of key-value pairs defining metadata values for the
- * -- file. The Map is serialized using standard JDK serialization, i.e.
- * -- an Int corresponding to the number of key-value pairs, followed by
- * -- Text key and value pairs. The following metadata properties are
- * -- mandatory for all RCFiles:
- * --
- * -- hive.io.rcfile.column.number: the number of columns in the RCFile
- *
- * file-header-metadata ::= Map<Text, Text>
- *
- * -- A 16 byte marker that is generated by the writer. This marker appears
- * -- at regular intervals at the beginning of rowgroup-headers, and is
- * -- intended to enable readers to skip over corrupted rowgroups.
- *
- * file-sync-hash ::= Byte[16]
- *
- * -- Each row group is split into three sections: a header, a set of
- * -- key buffers, and a set of column buffers. The header section includes
- * -- an optional sync hash, information about the size of the row group, and
- * -- the total number of rows in the row group. Each key buffer
- * -- consists of run-length encoding data which is used to decode
- * -- the length and offsets of individual fields in the corresponding column
- * -- buffer.
- *
- * rcfile-rowgroup ::=
- *   <rowgroup-header>
- *   <rowgroup-key-data>
- *   <rowgroup-column-buffers>
- *
- * rowgroup-header ::=
- *   [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
- *   <rowgroup-record-length>
- *   <rowgroup-key-length>
- *   <rowgroup-compressed-key-length>
- *
- * -- rowgroup-key-data is compressed if the column data is compressed.
- * rowgroup-key-data ::=
- *   <rowgroup-num-rows>
- *   <rowgroup-key-buffers>
- *
- * -- An integer (always -1) signaling the beginning of a sync-hash
- * -- field.
- *
- * rowgroup-sync-marker ::= Int
- *
- * -- A 16 byte sync field. This must match the <file-sync-hash> value read
- * -- in the file header.
- *
- * rowgroup-sync-hash ::= Byte[16]
- *
- * -- The record-length is the sum of the number of bytes used to store
- * -- the key and column parts, i.e. it is the total length of the current
- * -- rowgroup.
- *
- * rowgroup-record-length ::= Int
- *
- * -- Total length in bytes of the rowgroup's key sections.
- *
- * rowgroup-key-length ::= Int
- *
- * -- Total compressed length in bytes of the rowgroup's key sections.
- *
- * rowgroup-compressed-key-length ::= Int
- *
- * -- Number of rows in the current rowgroup.
- *
- * rowgroup-num-rows ::= VInt
- *
- * -- One or more column key buffers corresponding to each column
- * -- in the RCFile.
- *
- * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
- *
- * -- Data in each column buffer is stored using a run-length
- * -- encoding scheme that is intended to reduce the cost of
- * -- repeated column field values. This mechanism is described
- * -- in more detail in the following entries.
- *
- * rowgroup-key-buffer ::=
- *   <column-buffer-length>
- *   <column-buffer-uncompressed-length>
- *   <column-key-buffer-length>
- *   <column-key-buffer>
- *
- * -- The serialized length on disk of the corresponding column buffer.
- *
- * column-buffer-length ::= VInt
- *
- * -- The uncompressed length of the corresponding column buffer. This
- * -- is equivalent to column-buffer-length if the RCFile is not compressed.
- *
- * column-buffer-uncompressed-length ::= VInt
- *
- * -- The length in bytes of the current column key buffer
- *
- * column-key-buffer-length ::= VInt
- *
- * -- The column-key-buffer contains a sequence of serialized VInt values
- * -- corresponding to the byte lengths of the serialized column fields
- * -- in the corresponding rowgroup-column-buffer. For example, consider
- * -- an integer column that contains the consecutive values 1, 2, 3, 44.
- * -- The RCFile format stores these values as strings in the column buffer,
- * -- e.g. "12344". The length of each column field is recorded in
- * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
- * -- if the same length occurs repeatedly, then we replace repeated
- * -- run lengths with the complement (i.e. negative) of the number of
- * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
- *
- * column-key-buffer ::= Byte[column-key-buffer-length]
- *
- * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
- *
- * -- RCFile stores all column data as strings regardless of the
- * -- underlying column type. The strings are neither length-prefixed or
- * -- null-terminated, and decoding them into individual fields requires
- * -- the use of the run-length information contained in the corresponding
- * -- column-key-buffer.
- *
- * rowgroup-column-buffer ::= Byte[column-buffer-length]
- *
- * Byte ::= An eight-bit byte
- *
- * VInt ::= Variable length integer. The high-order bit of each byte
- * indicates whether more bytes remain to be read. The low-order seven
- * bits are appended as increasingly more significant bits in the
- * resulting integer value.
- *
- * Int ::= A four-byte integer in big-endian format.
- *
- * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
- * }
- * </pre>
- * </p>
- */
-public class RCFile {
-
-  private static final Log LOG = LogFactory.getLog(RCFile.class);
-
-  public static final String RECORD_INTERVAL_CONF_STR = "hive.io.rcfile.record.interval";
-  public static final String COLUMN_NUMBER_METADATA_STR = "hive.io.rcfile.column.number";
-
-  // All of the versions should be place in this list.
-  private static final int ORIGINAL_VERSION = 0;  // version with SEQ
-  private static final int NEW_MAGIC_VERSION = 1; // version with RCF
-
-  private static final int CURRENT_VERSION = NEW_MAGIC_VERSION;
-
-  // The first version of RCFile used the sequence file header.
-  private static final byte[] ORIGINAL_MAGIC = new byte[]{
-      (byte) 'S', (byte) 'E', (byte) 'Q'};
-  // the version that was included with the original magic, which is mapped
-  // into ORIGINAL_VERSION
-  private static final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
-
-  private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[]{
-      (byte) 'S', (byte) 'E', (byte) 'Q', ORIGINAL_MAGIC_VERSION_WITH_METADATA
-  };
-
-  // The 'magic' bytes at the beginning of the RCFile
-  private static final byte[] MAGIC = new byte[]{
-      (byte) 'R', (byte) 'C', (byte) 'F'};
-
-  private static final int SYNC_ESCAPE = -1; // "length" of sync entries
-  private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
-  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
-
-  /**
-   * The number of bytes between sync points.
-   */
-  public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
-  public static final String NULL = "rcfile.null";
-  public static final String SERDE = "rcfile.serde";
-
-  /**
-   * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
-   * below:
-   * <p/>
-   * <ul>
-   * <li>record length in bytes,it is the sum of bytes used to store the key
-   * part and the value part.</li>
-   * <li>Key length in bytes, it is how many bytes used by the key part.</li>
-   * <li>number_of_rows_in_this_record(vint),</li>
-   * <li>column_1_ondisk_length(vint),</li>
-   * <li>column_1_row_1_value_plain_length,</li>
-   * <li>column_1_row_2_value_plain_length,</li>
-   * <li>....</li>
-   * <li>column_2_ondisk_length(vint),</li>
-   * <li>column_2_row_1_value_plain_length,</li>
-   * <li>column_2_row_2_value_plain_length,</li>
-   * <li>.... .</li>
-   * <li>{the end of the key part}</li>
-   * </ul>
-   */
-  public static class KeyBuffer {
-    // each column's length in the value
-    private int[] eachColumnValueLen = null;
-    private int[] eachColumnUncompressedValueLen = null;
-    // stores each cell's length of a column in one DataOutputBuffer element
-    private NonSyncByteArrayOutputStream[] allCellValLenBuffer = null;
-    // how many rows in this split
-    private int numberRows = 0;
-    // how many columns
-    private int columnNumber = 0;
-
-    KeyBuffer(int columnNum) {
-      columnNumber = columnNum;
-      eachColumnValueLen = new int[columnNumber];
-      eachColumnUncompressedValueLen = new int[columnNumber];
-      allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      eachColumnValueLen = new int[columnNumber];
-      eachColumnUncompressedValueLen = new int[columnNumber];
-      allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
-
-      numberRows = WritableUtils.readVInt(in);
-      for (int i = 0; i < columnNumber; i++) {
-        eachColumnValueLen[i] = WritableUtils.readVInt(in);
-        eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
-        int bufLen = WritableUtils.readVInt(in);
-        if (allCellValLenBuffer[i] == null) {
-          allCellValLenBuffer[i] = new NonSyncByteArrayOutputStream();
-        } else {
-          allCellValLenBuffer[i].reset();
-        }
-        allCellValLenBuffer[i].write(in, bufLen);
-      }
-    }
-
-    /**
-     * @return the numberRows
-     */
-    public int getNumberRows() {
-      return numberRows;
-    }
-  }
-
-  /**
-   * ValueBuffer is the value of each record in RCFile. Its on-disk layout is as
-   * below:
-   * <ul>
-   * <li>Compressed or plain data of [column_1_row_1_value,
-   * column_1_row_2_value,....]</li>
-   * <li>Compressed or plain data of [column_2_row_1_value,
-   * column_2_row_2_value,....]</li>
-   * </ul>
-   */
-  public static class ValueBuffer implements Closeable{
-
-    // used to load columns' value into memory
-    private NonSyncByteArrayOutputStream[] loadedColumnsValueBuffer = null;
-
-    boolean inited = false;
-
-    // used for readFields
-    KeyBuffer keyBuffer;
-    private int columnNumber = 0;
-
-    // set true for columns that needed to skip loading into memory.
-    boolean[] skippedColIDs = null;
-
-    CompressionCodec codec;
-    Decompressor decompressor = null;
-    NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
-    private long readBytes = 0;
-
-
-    public ValueBuffer(KeyBuffer currentKey, int columnNumber,
-                       int[] targets, CompressionCodec codec, boolean[] skippedIDs)
-        throws IOException {
-      keyBuffer = currentKey;
-      this.columnNumber = columnNumber;
-      this.skippedColIDs = skippedIDs;
-      this.codec = codec;
-      loadedColumnsValueBuffer = new NonSyncByteArrayOutputStream[targets.length];
-      if (codec != null) {
-        decompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
-      }
-
-      for (int i = 0; i < targets.length; i++) {
-        loadedColumnsValueBuffer[i] = new NonSyncByteArrayOutputStream();
-      }
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      int addIndex = 0;
-      int skipTotal = 0;
-
-
-      for (int i = 0; i < columnNumber; i++) {
-        int vaRowsLen = keyBuffer.eachColumnValueLen[i];
-        // skip this column
-        if (skippedColIDs[i]) {
-          skipTotal += vaRowsLen;
-          continue;
-        }
-
-        if (skipTotal != 0) {
-          StorageUtil.skipFully(in, skipTotal);
-          skipTotal = 0;
-        }
-
-        NonSyncByteArrayOutputStream valBuf;
-        if (codec != null) {
-          // load into compressed buf first
-
-          byte[] compressedBytes = new byte[vaRowsLen];
-          in.readFully(compressedBytes, 0, vaRowsLen);
-
-          decompressBuffer.reset(compressedBytes, vaRowsLen);
-          if(decompressor != null) decompressor.reset();
-
-          DataInputStream is;
-          if (codec instanceof SplittableCompressionCodec) {
-            SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
-                decompressBuffer, decompressor, 0, vaRowsLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
-            is = new DataInputStream(deflatFilter);
-          } else {
-            CompressionInputStream deflatFilter = codec.createInputStream(decompressBuffer, decompressor);
-            is = new DataInputStream(deflatFilter);
-          }
-
-          valBuf = loadedColumnsValueBuffer[addIndex];
-          valBuf.reset();
-          valBuf.write(is, keyBuffer.eachColumnUncompressedValueLen[i]);
-          is.close();
-          decompressBuffer.close();
-        } else {
-          valBuf = loadedColumnsValueBuffer[addIndex];
-          valBuf.reset();
-          valBuf.write(in, vaRowsLen);
-        }
-        readBytes += keyBuffer.eachColumnUncompressedValueLen[i];
-        addIndex++;
-      }
-
-      if (skipTotal != 0) {
-        StorageUtil.skipFully(in, skipTotal);
-      }
-    }
-
-    public long getReadBytes() {
-      return readBytes;
-    }
-
-    public void clearColumnBuffer() throws IOException {
-      decompressBuffer.reset();
-      readBytes = 0;
-    }
-
-    @Override
-    public void close() {
-      for (NonSyncByteArrayOutputStream element : loadedColumnsValueBuffer) {
-        IOUtils.closeStream(element);
-      }
-      if (codec != null) {
-        IOUtils.closeStream(decompressBuffer);
-        if (decompressor != null) {
-          // Make sure we only return decompressor once.
-          org.apache.tajo.storage.compress.CodecPool.returnDecompressor(decompressor);
-          decompressor = null;
-        }
-      }
-    }
-  }
-
-  /**
-   * Create a metadata object with alternating key-value pairs.
-   * Eg. metadata(key1, value1, key2, value2)
-   */
-  public static Metadata createMetadata(Text... values) {
-    if (values.length % 2 != 0) {
-      throw new IllegalArgumentException("Must have a matched set of " +
-          "key-value pairs. " + values.length +
-          " strings supplied.");
-    }
-    Metadata result = new Metadata();
-    for (int i = 0; i < values.length; i += 2) {
-      result.set(values[i], values[i + 1]);
-    }
-    return result;
-  }
-
-  /**
-   * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
-   * compatible with SequenceFile's.
-   */
-  public static class RCFileAppender extends FileAppender {
-    FSDataOutputStream out;
-
-    CompressionCodec codec = null;
-    Metadata metadata = null;
-    FileSystem fs = null;
-    TableStatistics stats = null;
-    int columnNumber = 0;
-
-    // how many records the writer buffers before it writes to disk
-    private int RECORD_INTERVAL = Integer.MAX_VALUE;
-    // the max size of memory for buffering records before writes them out
-    private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 16M
-    // the conf string for COLUMNS_BUFFER_SIZE
-    public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
-
-    // how many records already buffered
-    private int bufferedRecords = 0;
-    private ColumnBuffer[] columnBuffers = null;
-    boolean useNewMagic = true;
-    private byte[] nullChars;
-    private SerializerDeserializer serde;
-    private boolean isShuffle;
-
-    // Insert a globally unique 16-byte value every few entries, so that one
-    // can seek into the middle of a file and then synchronize with record
-    // starts and ends by scanning for this value.
-    long lastSyncPos; // position of last sync
-    byte[] sync; // 16 random bytes
-
-    {
-      try {
-        MessageDigest digester = MessageDigest.getInstance("MD5");
-        long time = System.currentTimeMillis();
-        digester.update((new UID() + "@" + time).getBytes());
-        sync = digester.digest();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    /*
-     * used for buffering appends before flush them out
-     */
-    class ColumnBuffer {
-      // used for buffer a column's values
-      NonSyncByteArrayOutputStream columnValBuffer;
-      // used to store each value's length
-      NonSyncByteArrayOutputStream valLenBuffer;
-
-      /*
-       * use a run-length encoding. We only record run length if a same
-       * 'prevValueLen' occurs more than one time. And we negative the run
-       * length to distinguish a runLength and a normal value length. For
-       * example, if the values' lengths are 1,1,1,2, we record 1, ~2,2. And for
-       * value lengths 1,2,3 we record 1,2,3.
-       */
-      int columnValueLength = 0;
-      int uncompressedColumnValueLength = 0;
-      int columnKeyLength = 0;
-      int runLength = 0;
-      int prevValueLength = -1;
-
-      ColumnBuffer() throws IOException {
-        columnValBuffer = new NonSyncByteArrayOutputStream();
-        valLenBuffer = new NonSyncByteArrayOutputStream();
-      }
-
-      public int append(Column column, Datum datum) throws IOException {
-        int currentLen = serde.serialize(column, datum, columnValBuffer, nullChars);
-        columnValueLength += currentLen;
-        uncompressedColumnValueLength += currentLen;
-
-        if (prevValueLength < 0) {
-          startNewGroup(currentLen);
-          return currentLen;
-        }
-
-        if (currentLen != prevValueLength) {
-          flushGroup();
-          startNewGroup(currentLen);
-        } else {
-          runLength++;
-        }
-        return currentLen;
-      }
-
-      private void startNewGroup(int currentLen) {
-        prevValueLength = currentLen;
-        runLength = 0;
-      }
-
-      public void clear() {
-        valLenBuffer.reset();
-        columnValBuffer.reset();
-        prevValueLength = -1;
-        runLength = 0;
-        columnValueLength = 0;
-        columnKeyLength = 0;
-        uncompressedColumnValueLength = 0;
-      }
-
-      public int flushGroup() {
-        int len = 0;
-        if (prevValueLength >= 0) {
-          len += valLenBuffer.writeVLong(prevValueLength);
-          if (runLength > 0) {
-            len += valLenBuffer.writeVLong(~runLength);
-          }
-          columnKeyLength += len;
-          runLength = -1;
-          prevValueLength = -1;
-        }
-        return len;
-      }
-
-      public int UnFlushedGroupSize() {
-        int len = 0;
-        if (prevValueLength >= 0) {
-          len += WritableUtils.getVIntSize(prevValueLength);
-          if (runLength > 0) {
-            len += WritableUtils.getVIntSize(~runLength);
-          }
-        }
-        return len;
-      }
-    }
-
-    public long getLength() throws IOException {
-      return out.getPos();
-    }
-
-    public RCFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
-      super(conf, schema, meta, path);
-
-      RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
-      COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR, COLUMNS_BUFFER_SIZE);
-      columnNumber = schema.size();
-    }
-
-    public void init() throws IOException {
-      fs = path.getFileSystem(conf);
-
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
-      //determine the intermediate file type
-      String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
-          TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
-      if (enabledStats && CatalogProtos.StoreType.RCFILE == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
-        isShuffle = true;
-      } else {
-        isShuffle = false;
-      }
-
-      if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
-        String codecClassname = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
-        try {
-          Class<? extends CompressionCodec> codecClass = conf.getClassByName(
-              codecClassname).asSubclass(CompressionCodec.class);
-          codec = ReflectionUtils.newInstance(codecClass, conf);
-        } catch (ClassNotFoundException cnfe) {
-          throw new IllegalArgumentException(
-              "Unknown codec: " + codecClassname, cnfe);
-        }
-      }
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.RCFILE_NULL,
-          NullDatum.DEFAULT_TEXT));
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes();
-      }
-
-      if (metadata == null) {
-        metadata = new Metadata();
-      }
-
-      metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text("" + columnNumber));
-
-      String serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE,
-          BinarySerializerDeserializer.class.getName());
-      try {
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
-      metadata.set(new Text(StorageConstants.RCFILE_SERDE), new Text(serdeClass));
-
-      columnBuffers = new ColumnBuffer[columnNumber];
-      for (int i = 0; i < columnNumber; i++) {
-        columnBuffers[i] = new ColumnBuffer();
-      }
-
-      init(conf, fs.create(path, true, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata);
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-      super.init();
-    }
-
-    /**
-     * Write the initial part of file header.
-     */
-    void initializeFileHeader() throws IOException {
-      if (useNewMagic) {
-        out.write(MAGIC);
-        out.write(CURRENT_VERSION);
-      } else {
-        out.write(ORIGINAL_MAGIC_VERSION);
-      }
-    }
-
-    /**
-     * Write the final part of file header.
-     */
-    void finalizeFileHeader() throws IOException {
-      out.write(sync); // write the sync bytes
-      out.flush(); // flush header
-    }
-
-    boolean isCompressed() {
-      return codec != null;
-    }
-
-    /**
-     * Write and flush the file header.
-     */
-    void writeFileHeader() throws IOException {
-      if (useNewMagic) {
-        out.writeBoolean(isCompressed());
-      } else {
-        Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer");
-        Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer");
-        out.writeBoolean(isCompressed());
-        out.writeBoolean(false);
-      }
-
-      if (isCompressed()) {
-        Text.writeString(out, (codec.getClass()).getName());
-      }
-      metadata.write(out);
-    }
-
-    void init(Configuration conf, FSDataOutputStream out,
-              CompressionCodec codec, Metadata metadata) throws IOException {
-      this.out = out;
-      this.codec = codec;
-      this.metadata = metadata;
-      this.useNewMagic = conf.getBoolean(TajoConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
-    }
-
-    /**
-     * create a sync point.
-     */
-    public void sync() throws IOException {
-      if (sync != null && lastSyncPos != out.getPos()) {
-        out.writeInt(SYNC_ESCAPE); // mark the start of the sync
-        out.write(sync); // write sync
-        lastSyncPos = out.getPos(); // update lastSyncPos
-      }
-    }
-
-    private void checkAndWriteSync() throws IOException {
-      if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
-        sync();
-      }
-    }
-
-    private int columnBufferSize = 0;
-
-    @Override
-    public long getOffset() throws IOException {
-      return out.getPos();
-    }
-
-    @Override
-    public void flush() throws IOException {
-      flushRecords();
-      out.flush();
-    }
-
-    @Override
-    public void addTuple(Tuple t) throws IOException {
-      append(t);
-      // Statistical section
-
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    /**
-     * Append a row of values. Currently it only can accept <
-     * {@link Tuple}. If its <code>size()</code> is less than the
-     * column number in the file, zero bytes are appended for the empty columns.
-     * If its size() is greater then the column number in the file, the exceeded
-     * columns' bytes are ignored.
-     *
-     * @param tuple a Tuple with the list of serialized columns
-     * @throws IOException
-     */
-    public void append(Tuple tuple) throws IOException {
-      int size = schema.size();
-
-      for (int i = 0; i < size; i++) {
-        Datum datum = tuple.get(i);
-        int length = columnBuffers[i].append(schema.getColumn(i), datum);
-        columnBufferSize += length;
-        if (isShuffle) {
-          // it is to calculate min/max values, and it is only used for the intermediate file.
-          stats.analyzeField(i, datum);
-        }
-      }
-
-      if (size < columnNumber) {
-        for (int i = size; i < columnNumber; i++) {
-          columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
-          if (isShuffle) {
-            stats.analyzeField(i, NullDatum.get());
-          }
-        }
-      }
-
-      bufferedRecords++;
-      //TODO compression rate base flush
-      if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
-          || (bufferedRecords >= RECORD_INTERVAL)) {
-        flushRecords();
-      }
-    }
-
-    /**
-     * get number of bytes to store the keyBuffer.
-     *
-     * @return number of bytes used to store this KeyBuffer on disk
-     * @throws IOException
-     */
-    public int getKeyBufferSize() throws IOException {
-      int ret = 0;
-      ret += WritableUtils.getVIntSize(bufferedRecords);
-      for (int i = 0; i < columnBuffers.length; i++) {
-        ColumnBuffer currentBuf = columnBuffers[i];
-        ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
-        ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
-        ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
-        ret += currentBuf.columnKeyLength;
-      }
-
-      return ret;
-    }
-
-    /**
-     * get number of bytes to store the key part.
-     *
-     * @return number of bytes used to store this Key part on disk
-     * @throws IOException
-     */
-    public int getKeyPartSize() throws IOException {
-      int ret = 12; //12 bytes |record count, key length, compressed key length|
-
-      ret += WritableUtils.getVIntSize(bufferedRecords);
-      for (int i = 0; i < columnBuffers.length; i++) {
-        ColumnBuffer currentBuf = columnBuffers[i];
-        ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
-        ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
-        ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
-        ret += currentBuf.columnKeyLength;
-        ret += currentBuf.UnFlushedGroupSize();
-      }
-
-      return ret;
-    }
-
-    private void WriteKeyBuffer(DataOutputStream out) throws IOException {
-      WritableUtils.writeVLong(out, bufferedRecords);
-      for (int i = 0; i < columnBuffers.length; i++) {
-        ColumnBuffer currentBuf = columnBuffers[i];
-        WritableUtils.writeVLong(out, currentBuf.columnValueLength);
-        WritableUtils.writeVLong(out, currentBuf.uncompressedColumnValueLength);
-        WritableUtils.writeVLong(out, currentBuf.columnKeyLength);
-        currentBuf.valLenBuffer.writeTo(out);
-      }
-    }
-
-    private void flushRecords() throws IOException {
-
-      Compressor compressor = null;
-      NonSyncByteArrayOutputStream valueBuffer = null;
-      CompressionOutputStream deflateFilter = null;
-      DataOutputStream deflateOut = null;
-      boolean isCompressed = isCompressed();
-
-      int valueLength = 0;
-      if (isCompressed) {
-        compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
-        if (compressor != null) compressor.reset();  //builtin gzip is null
-
-        valueBuffer = new NonSyncByteArrayOutputStream();
-        deflateFilter = codec.createOutputStream(valueBuffer, compressor);
-        deflateOut = new DataOutputStream(deflateFilter);
-      }
-
-      try {
-        for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
-          ColumnBuffer currentBuf = columnBuffers[columnIndex];
-          currentBuf.flushGroup();
-
-          NonSyncByteArrayOutputStream columnValue = currentBuf.columnValBuffer;
-          int colLen;
-          int plainLen = columnValue.getLength();
-          if (isCompressed) {
-            deflateFilter.resetState();
-            deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
-            deflateOut.flush();
-            deflateFilter.finish();
-            columnValue.close();
-            // find how much compressed data was added for this column
-            colLen = valueBuffer.getLength() - valueLength;
-            currentBuf.columnValueLength = colLen;
-          } else {
-            colLen = plainLen;
-          }
-          valueLength += colLen;
-        }
-      } catch (IOException e) {
-        IOUtils.cleanup(LOG, deflateOut, out);
-        throw e;
-      }
-
-      if (compressor != null) {
-        org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
-      }
-
-      int keyLength = getKeyBufferSize();
-      if (keyLength < 0) {
-        throw new IOException("negative length keys not allowed: " + keyLength);
-      }
-      // Write the key out
-      writeKey(keyLength + valueLength, keyLength);
-      // write the value out
-      if (isCompressed) {
-        try {
-          out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
-        } finally {
-          IOUtils.cleanup(LOG, valueBuffer);
-        }
-      } else {
-        for (int columnIndex = 0; columnIndex < columnNumber; ++columnIndex) {
-          columnBuffers[columnIndex].columnValBuffer.writeTo(out);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Column#" + columnIndex + " : Plain Total Column Value Length: "
-                + columnBuffers[columnIndex].uncompressedColumnValueLength
-                + ",  Compr Total Column Value Length: " + columnBuffers[columnIndex].columnValueLength);
-          }
-        }
-      }
-      // clear the columnBuffers
-      clearColumnBuffers();
-
-      bufferedRecords = 0;
-      columnBufferSize = 0;
-    }
-
-    private void writeKey(int recordLen, int keyLength) throws IOException {
-      checkAndWriteSync(); // sync
-      out.writeInt(recordLen); // total record length
-      out.writeInt(keyLength); // key portion length
-
-      if (this.isCompressed()) {
-        Compressor compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
-        if (compressor != null) compressor.reset();  //builtin gzip is null
-
-        NonSyncByteArrayOutputStream compressionBuffer = new NonSyncByteArrayOutputStream();
-        CompressionOutputStream deflateFilter = codec.createOutputStream(compressionBuffer, compressor);
-        DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
-
-        //compress key and write key out
-        compressionBuffer.reset();
-        deflateFilter.resetState();
-        WriteKeyBuffer(deflateOut);
-        deflateOut.flush();
-        deflateFilter.finish();
-        int compressedKeyLen = compressionBuffer.getLength();
-        out.writeInt(compressedKeyLen);
-        compressionBuffer.writeTo(out);
-        compressionBuffer.reset();
-        deflateOut.close();
-        org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
-      } else {
-        out.writeInt(keyLength);
-        WriteKeyBuffer(out);
-      }
-    }
-
-    private void clearColumnBuffers() throws IOException {
-      for (int i = 0; i < columnNumber; i++) {
-        columnBuffers[i].clear();
-      }
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (bufferedRecords > 0) {
-        flushRecords();
-      }
-      clearColumnBuffers();
-
-      if (out != null) {
-        // Statistical section
-        if (enabledStats) {
-          stats.setNumBytes(getOffset());
-        }
-        // Close the underlying stream if we own it...
-        out.flush();
-        IOUtils.cleanup(LOG, out);
-        out = null;
-      }
-    }
-  }
-
-  /**
-   * Read KeyBuffer/ValueBuffer pairs from a RCFile.
-   */
-  public static class RCFileScanner extends FileScanner {
-    private static class SelectedColumn {
-      public int colIndex;
-      public int rowReadIndex;
-      public int runLength;
-      public int prvLength;
-      public boolean isNulled;
-    }
-
-    private FSDataInputStream in;
-
-    private byte version;
-
-    private CompressionCodec codec = null;
-    private Metadata metadata = null;
-
-    private byte[] sync;
-    private byte[] syncCheck;
-    private boolean syncSeen;
-    private long lastSeenSyncPos = 0;
-
-    private long headerEnd;
-    private long start, end;
-    private final long startOffset, endOffset;
-    private int[] targetColumnIndexes;
-
-    private int currentKeyLength;
-    private int currentRecordLength;
-
-    private ValueBuffer currentValue;
-
-    private int readRowsIndexInBuffer = 0;
-
-    private int recordsNumInValBuffer = 0;
-
-    private int columnNumber = 0;
-
-    private boolean more = true;
-
-    private int passedRowsNum = 0;
-
-    private boolean decompress = false;
-
-    private Decompressor keyDecompressor;
-
-    private long readBytes = 0;
-
-    //Current state of each selected column - e.g. current run length, etc.
-    // The size of the array is equal to the number of selected columns
-    private SelectedColumn[] selectedColumns;
-
-    // column value lengths for each of the selected columns
-    private NonSyncDataInputBuffer[] colValLenBufferReadIn;
-
-    private LongWritable rowId;
-    private byte[] nullChars;
-    private SerializerDeserializer serde;
-
-    public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
-                         final FileFragment fragment) throws IOException {
-      super(conf, schema, meta, fragment);
-      conf.setInt("io.file.buffer.size", 4096); //TODO remove
-
-      startOffset = fragment.getStartKey();
-      endOffset = startOffset + fragment.getEndKey();
-      start = 0;
-    }
-
-    @Override
-    public void init() throws IOException {
-      sync = new byte[SYNC_HASH_SIZE];
-      syncCheck = new byte[SYNC_HASH_SIZE];
-
-      more = startOffset < endOffset;
-      rowId = new LongWritable();
-      readBytes = 0;
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.RCFILE_NULL,
-          NullDatum.DEFAULT_TEXT));
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes();
-      }
-
-      // projection
-      if (targets == null) {
-        targets = schema.toArray();
-      }
-
-      targetColumnIndexes = new int[targets.length];
-      for (int i = 0; i < targets.length; i++) {
-        targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
-      }
-      Arrays.sort(targetColumnIndexes);
-
-      FileSystem fs = fragment.getPath().getFileSystem(conf);
-      end = fs.getFileStatus(fragment.getPath()).getLen();
-      in = openFile(fs, fragment.getPath(), 4096);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RCFile open:" + fragment.getPath() + "," + start + "," + (endOffset - startOffset) +
-            "," + fs.getFileStatus(fragment.getPath()).getLen());
-      }
-      //init RCFILE Header
-      boolean succeed = false;
-      try {
-        if (start > 0) {
-          seek(0);
-          initHeader();
-        } else {
-          initHeader();
-        }
-        succeed = true;
-      } finally {
-        if (!succeed) {
-          if (in != null) {
-            try {
-              in.close();
-            } catch (IOException e) {
-              if (LOG != null && LOG.isDebugEnabled()) {
-                LOG.debug("Exception in closing " + in, e);
-              }
-            }
-          }
-        }
-      }
-
-      columnNumber = Integer.parseInt(metadata.get(new Text(COLUMN_NUMBER_METADATA_STR)).toString());
-      selectedColumns = new SelectedColumn[targetColumnIndexes.length];
-      colValLenBufferReadIn = new NonSyncDataInputBuffer[targetColumnIndexes.length];
-      boolean[] skippedColIDs = new boolean[columnNumber];
-      Arrays.fill(skippedColIDs, true);
-      super.init();
-
-      for (int i = 0; i < targetColumnIndexes.length; i++) {
-        int tid = targetColumnIndexes[i];
-        if (tid < columnNumber) {
-          skippedColIDs[tid] = false;
-
-          SelectedColumn col = new SelectedColumn();
-          col.colIndex = tid;
-          col.runLength = 0;
-          col.prvLength = -1;
-          col.rowReadIndex = 0;
-          selectedColumns[i] = col;
-          colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
-        }
-      }
-
-      currentKey = createKeyBuffer();
-      currentValue = new ValueBuffer(null, columnNumber, targetColumnIndexes, codec, skippedColIDs);
-
-      if (startOffset > getPosition()) {    // TODO use sync cache
-        sync(startOffset); // sync to start
-      }
-    }
-
-    /**
-     * Return the metadata (Text to Text map) that was written into the
-     * file.
-     */
-    public Metadata getMetadata() {
-      return metadata;
-    }
-
-    /**
-     * Return the metadata value associated with the given key.
-     *
-     * @param key the metadata key to retrieve
-     */
-    public Text getMetadataValueOf(Text key) {
-      return metadata.get(key);
-    }
-
-    /**
-     * Override this method to specialize the type of
-     * {@link FSDataInputStream} returned.
-     */
-    protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize) throws IOException {
-      return fs.open(file, bufferSize);
-    }
-
-    private void initHeader() throws IOException {
-      byte[] magic = new byte[MAGIC.length];
-      in.readFully(magic);
-
-      if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
-        byte vers = in.readByte();
-        if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
-          throw new IOException(fragment.getPath() + " is a version " + vers +
-              " SequenceFile instead of an RCFile.");
-        }
-        version = ORIGINAL_VERSION;
-      } else {
-        if (!Arrays.equals(magic, MAGIC)) {
-          throw new IOException(fragment.getPath() + " not a RCFile and has magic of " +
-              new String(magic));
-        }
-
-        // Set 'version'
-        version = in.readByte();
-        if (version > CURRENT_VERSION) {
-          throw new VersionMismatchException((byte) CURRENT_VERSION, version);
-        }
-      }
-
-      if (version == ORIGINAL_VERSION) {
-        try {
-          Class<?> keyCls = conf.getClassByName(Text.readString(in));
-          Class<?> valCls = conf.getClassByName(Text.readString(in));
-          if (!keyCls.equals(KeyBuffer.class)
-              || !valCls.equals(ValueBuffer.class)) {
-            throw new IOException(fragment.getPath() + " not a RCFile");
-          }
-        } catch (ClassNotFoundException e) {
-          throw new IOException(fragment.getPath() + " not a RCFile", e);
-        }
-      }
-
-      decompress = in.readBoolean(); // is compressed?
-
-      if (version == ORIGINAL_VERSION) {
-        // is block-compressed? it should be always false.
-        boolean blkCompressed = in.readBoolean();
-        if (blkCompressed) {
-          throw new IOException(fragment.getPath() + " not a RCFile.");
-        }
-      }
-
-      // setup the compression codec
-      if (decompress) {
-        String codecClassname = Text.readString(in);
-        try {
-          Class<? extends CompressionCodec> codecClass = conf.getClassByName(
-              codecClassname).asSubclass(CompressionCodec.class);
-          codec = ReflectionUtils.newInstance(codecClass, conf);
-        } catch (ClassNotFoundException cnfe) {
-          throw new IllegalArgumentException(
-              "Unknown codec: " + codecClassname, cnfe);
-        }
-
-        keyDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
-      }
-
-      metadata = new Metadata();
-      metadata.readFields(in);
-
-      Text text = metadata.get(new Text(StorageConstants.RCFILE_SERDE));
-
-      try {
-        String serdeClass;
-        if(text != null && !text.toString().isEmpty()){
-          serdeClass = text.toString();
-        } else{
-          serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
-        }
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
-
-      in.readFully(sync); // read sync bytes
-      headerEnd = in.getPos();
-      lastSeenSyncPos = headerEnd; //initial sync position
-      readBytes += headerEnd;
-    }
-
-    /**
-     * Return the current byte position in the input file.
-     */
-    public long getPosition() throws IOException {
-      return in.getPos();
-    }
-
-    /**
-     * Set the current byte position in the input file.
-     * <p/>
-     * <p/>
-     * The position passed must be a position returned by
-     * {@link RCFile.RCFileAppender#getLength()} when writing this file. To seek to an
-     * arbitrary position, use {@link RCFile.RCFileScanner#sync(long)}. In another
-     * words, the current seek can only seek to the end of the file. For other
-     * positions, use {@link RCFile.RCFileScanner#sync(long)}.
-     */
-    public void seek(long position) throws IOException {
-      in.seek(position);
-    }
-
-    /**
-     * Resets the values which determine if there are more rows in the buffer
-     * <p/>
-     * This can be used after one calls seek or sync, if one called next before that.
-     * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
-     * buffer built up from the call to next.
-     */
-    public void resetBuffer() {
-      readRowsIndexInBuffer = 0;
-      recordsNumInValBuffer = 0;
-    }
-
-    /**
-     * Seek to the next sync mark past a given position.
-     */
-    public void sync(long position) throws IOException {
-      if (position + SYNC_SIZE >= end) {
-        seek(end);
-        return;
-      }
-
-      //this is to handle syn(pos) where pos < headerEnd.
-      if (position < headerEnd) {
-        // seek directly to first record
-        in.seek(headerEnd);
-        // note the sync marker "seen" in the header
-        syncSeen = true;
-        return;
-      }
-
-      try {
-        seek(position + 4); // skip escape
-
-        int prefix = sync.length;
-        int n = conf.getInt("io.bytes.per.checksum", 512);
-        byte[] buffer = new byte[prefix + n];
-        n = (int) Math.min(n, end - in.getPos());
-        /* fill array with a pattern that will never match sync */
-        Arrays.fill(buffer, (byte) (~sync[0]));
-        while (n > 0 && (in.getPos() + n) <= end) {
-          position = in.getPos();
-          in.readFully(buffer, prefix, n);
-          readBytes += n;
-          /* the buffer has n+sync bytes */
-          for (int i = 0; i < n; i++) {
-            int j;
-            for (j = 0; j < sync.length && sync[j] == buffer[i + j]; j++) {
-              /* nothing */
-            }
-            if (j == sync.length) {
-              /* simplified from (position + (i - prefix) + sync.length) - SYNC_SIZE */
-              in.seek(position + i - SYNC_SIZE);
-              return;
-            }
-          }
-          /* move the last 16 bytes to the prefix area */
-          System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
-          n = (int) Math.min(n, end - in.getPos());
-        }
-      } catch (ChecksumException e) { // checksum failure
-        handleChecksumException(e);
-      }
-    }
-
-    private void handleChecksumException(ChecksumException e) throws IOException {
-      if (conf.getBoolean("io.skip.checksum.errors", false)) {
-        LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
-        sync(getPosition() + conf.getInt("io.bytes.per.checksum", 512));
-      } else {
-        throw e;
-      }
-    }
-
-    private KeyBuffer createKeyBuffer() {
-      return new KeyBuffer(columnNumber);
-    }
-
-    /**
-     * Read and return the next record length, potentially skipping over a sync
-     * block.
-     *
-     * @return the length of the next record or -1 if there is no next record
-     * @throws IOException
-     */
-    private int readRecordLength() throws IOException {
-      if (in.getPos() >= end) {
-        return -1;
-      }
-      int length = in.readInt();
-      readBytes += 4;
-      if (sync != null && length == SYNC_ESCAPE) { // process
-        // a
-        // sync entry
-        lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
-        in.readFully(syncCheck); // read syncCheck
-        readBytes += SYNC_HASH_SIZE;
-        if (!Arrays.equals(sync, syncCheck)) {
-          throw new IOException("File is corrupt!");
-        }
-        syncSeen = true;
-        if (in.getPos() >= end) {
-          return -1;
-        }
-        length = in.readInt(); // re-read length
-        readBytes += 4;
-      } else {
-        syncSeen = false;
-      }
-      return length;
-    }
-
-    private void seekToNextKeyBuffer() throws IOException {
-      if (!keyInit) {
-        return;
-      }
-      if (!currentValue.inited) {
-        IOUtils.skipFully(in, currentRecordLength - currentKeyLength);
-      }
-    }
-
-    private int compressedKeyLen = 0;
-    NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
-    NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
-
-    KeyBuffer currentKey = null;
-    boolean keyInit = false;
-
-    protected int nextKeyBuffer() throws IOException {
-      seekToNextKeyBuffer();
-      currentRecordLength = readRecordLength();
-      if (currentRecordLength == -1) {
-        keyInit = false;
-        return -1;
-      }
-      currentKeyLength = in.readInt();
-      compressedKeyLen = in.readInt();
-      readBytes += 8;
-      if (decompress) {
-
-        byte[] compressedBytes = new byte[compressedKeyLen];
-        in.readFully(compressedBytes, 0, compressedKeyLen);
-
-        if (keyDecompressor != null) keyDecompressor.reset();
-        keyDecompressBuffer.reset(compressedBytes, compressedKeyLen);
-
-        DataInputStream is;
-        if (codec instanceof SplittableCompressionCodec) {
-          SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
-              keyDecompressBuffer, keyDecompressor, 0, compressedKeyLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
-          keyDecompressBuffer.seek(deflatFilter.getAdjustedStart());
-          is = new DataInputStream(deflatFilter);
-        } else {
-          CompressionInputStream deflatFilter = codec.createInputStream(keyDecompressBuffer, keyDecompressor);
-          is = new DataInputStream(deflatFilter);
-        }
-
-        byte[] deCompressedBytes = new byte[currentKeyLength];
-
-        is.readFully(deCompressedBytes, 0, currentKeyLength);
-        keyDataIn.reset(deCompressedBytes, currentKeyLength);
-        currentKey.readFields(keyDataIn);
-        is.close();
-      } else {
-        currentKey.readFields(in);
-      }
-      readBytes += currentKeyLength;
-      keyInit = true;
-      currentValue.inited = false;
-
-      readRowsIndexInBuffer = 0;
-      recordsNumInValBuffer = currentKey.numberRows;
-
-      for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
-        SelectedColumn col = selectedColumns[selIx];
-        if (col == null) {
-          col = new SelectedColumn();
-          col.isNulled = true;
-          selectedColumns[selIx] = col;
-          continue;
-        }
-
-        int colIx = col.colIndex;
-        NonSyncByteArrayOutputStream buf = currentKey.allCellValLenBuffer[colIx];
-        colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
-        col.rowReadIndex = 0;
-        col.runLength = 0;
-        col.prvLength = -1;
-        col.isNulled = buf.getLength() == 0;
-      }
-
-      return currentKeyLength;
-    }
-
-    protected void currentValueBuffer() throws IOException {
-      if (!keyInit) {
-        nextKeyBuffer();
-      }
-      currentValue.keyBuffer = currentKey;
-      currentValue.clearColumnBuffer();
-      currentValue.readFields(in);
-      currentValue.inited = true;
-      readBytes += currentValue.getReadBytes();
-
-      if (tableStats != null) {
-        tableStats.setReadBytes(readBytes);
-        tableStats.setNumRows(passedRowsNum);
-      }
-    }
-
-    private boolean rowFetched = false;
-
-    @Override
-    public Tuple next() throws IOException {
-      if (!more) {
-        return null;
-      }
-
-      more = nextBuffer(rowId);
-      long lastSeenSyncPos = lastSeenSyncPos();
-      if (lastSeenSyncPos >= endOffset) {
-        more = false;
-        return null;
-      }
-
-      if (!more) {
-        return null;
-      }
-
-      Tuple tuple = new VTuple(schema.size());
-      getCurrentRow(tuple);
-      return tuple;
-    }
-
-    @Override
-    public float getProgress() {
-      try {
-        if(!more) {
-          return 1.0f;
-        }
-        long filePos = getPosition();
-        if (startOffset == filePos) {
-          return 0.0f;
-        } else {
-          //if scanner read the header, filePos moved to zero
-          return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getEndKey()));
-        }
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-        return 0.0f;
-      }
-    }
-
-    /**
-     * Returns how many rows we fetched with nextBuffer(). It only means how many rows
-     * are read by nextBuffer(). The returned result may be smaller than actual number
-     * of rows passed by, because {@link #seek(long)} can change the underlying key buffer and
-     * value buffer.
-     *
-     * @return next row number
-     * @throws IOException
-     */
-    public boolean nextBuffer(LongWritable readRows) throws IOException {
-      if (readRowsIndexInBuffer < recordsNumInValBuffer) {
-        readRows.set(passedRowsNum);
-        readRowsIndexInBuffer++;
-        passedRowsNum++;
-        rowFetched = false;
-        return true;
-      } else {
-        keyInit = false;
-      }
-
-      int ret = -1;
-      try {
-        ret = nextKeyBuffer();
-      } catch (EOFException eof) {
-        eof.printStackTrace();
-      }
-      return (ret > 0) && nextBuffer(readRows);
-    }
-
-    /**
-     * get the current row used,make sure called {@link #next()}
-     * first.
-     *
-     * @throws IOException
-     */
-    public void getCurrentRow(Tuple tuple) throws IOException {
-      if (!keyInit || rowFetched) {
-        return;
-      }
-
-      if (!currentValue.inited) {
-        currentValueBuffer();
-      }
-
-      for (int j = 0; j < selectedColumns.length; ++j) {
-        SelectedColumn col = selectedColumns[j];
-        int i = col.colIndex;
-
-        if (col.isNulled) {
-          tuple.put(i, NullDatum.get());
-        } else {
-          colAdvanceRow(j, col);
-
-          Datum datum = serde.deserialize(schema.getColumn(i),
-              currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars);
-          tuple.put(i, datum);
-          col.rowReadIndex += col.prvLength;
-        }
-      }
-      rowFetched = true;
-    }
-
-    /**
-     * Advance column state to the next now: update offsets, run lengths etc
-     *
-     * @param selCol - index among selectedColumns
-     * @param col    - column object to update the state of.  prvLength will be
-     *               set to the new read position
-     * @throws IOException
-     */
-    private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException {
-      if (col.runLength > 0) {
-        --col.runLength;
-      } else {
-        int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[selCol]);
-        if (length < 0) {
-          // we reach a runlength here, use the previous length and reset
-          // runlength
-          col.runLength = (~length) - 1;
-        } else {
-          col.prvLength = length;
-          col.runLength = 0;
-        }
-      }
-    }
-
-    /**
-     * Returns true if the previous call to next passed a sync mark.
-     */
-    public boolean syncSeen() {
-      return syncSeen;
-    }
-
-    /**
-     * Returns the last seen sync position.
-     */
-    public long lastSeenSyncPos() {
-      return lastSeenSyncPos;
-    }
-
-    /**
-     * Returns the name of the file.
-     */
-    @Override
-    public String toString() {
-      return fragment.getPath().toString();
-    }
-
-    @Override
-    public void reset() throws IOException {
-      seek(startOffset);
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return true;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSplittable() {
-      return true;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (tableStats != null) {
-        tableStats.setReadBytes(readBytes);  //Actual Processed Bytes. (decompressed bytes + header - seek)
-        tableStats.setNumRows(passedRowsNum);
-      }
-
-      IOUtils.cleanup(LOG, in, currentValue);
-      if (keyDecompressor != null) {
-        // Make sure we only return decompressor once.
-        org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);
-        keyDecompressor = null;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java
deleted file mode 100644
index 60f1b06..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.hadoop.io.compress.CompressionInputStream;
-
-import java.io.InputStream;
-
-/**
- *
- * SchemaAwareCompressionInputStream adds the ability to inform the compression
- * stream what column is being read.
- *
- */
-public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream {
-
-  protected SchemaAwareCompressionInputStream(InputStream in) throws java.io.IOException {
-    super(in);
-  }
-
-  /**
-   * The column being read
-   *
-   * @param columnIndex the index of the column. Use -1 for non-column data
-   */
-  public abstract void setColumnIndex(int columnIndex);
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java
deleted file mode 100644
index c0ce8b3..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-
-import java.io.OutputStream;
-
-/**
- *
- * SchemaAwareCompressionOutputStream adds the ability to inform the comression stream
- * the current column being compressed.
- *
- */
-public abstract class SchemaAwareCompressionOutputStream extends CompressionOutputStream {
-
-  protected SchemaAwareCompressionOutputStream(OutputStream out) {
-    super(out);
-  }
-
-  /**
-   *
-   * The column being output
-   *
-   * @param columnIndex the index of the column. Use -1 for non-column data
-   */
-  public abstract void setColumnIndex(int columnIndex);
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
deleted file mode 100644
index f5cef62..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.sequencefile;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-public class SequenceFileAppender extends FileAppender {
-  private static final Log LOG = LogFactory.getLog(SequenceFileScanner.class);
-
-  private SequenceFile.Writer writer;
-
-  private TableMeta meta;
-  private Schema schema;
-  private TableStatistics stats = null;
-
-  private int columnNum;
-  private FileSystem fs;
-  private char delimiter;
-  private byte[] nullChars;
-
-  private final static int BUFFER_SIZE = 128 * 1024;
-  private long pos = 0;
-
-  private CompressionCodecFactory codecFactory;
-  private CompressionCodec codec;
-
-  private NonSyncByteArrayOutputStream os;
-  private SerializerDeserializer serde;
-
-  long rowCount;
-  private boolean isShuffle;
-
-  private Writable EMPTY_KEY;
-
-  public SequenceFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
-    super(conf, schema, meta, path);
-    this.meta = meta;
-    this.schema = schema;
-  }
-
-  @Override
-  public void init() throws IOException {
-    os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
-
-    this.fs = path.getFileSystem(conf);
-
-    //determine the intermediate file type
-    String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
-        TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
-    if (enabledStats && CatalogProtos.StoreType.SEQUENCEFILE == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
-      isShuffle = true;
-    } else {
-      isShuffle = false;
-    }
-
-    this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.SEQUENCEFILE_DELIMITER,
-        StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
-    this.columnNum = schema.size();
-    String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.SEQUENCEFILE_NULL,
-        NullDatum.DEFAULT_TEXT));
-    if (StringUtils.isEmpty(nullCharacters)) {
-      nullChars = NullDatum.get().asTextBytes();
-    } else {
-      nullChars = nullCharacters.getBytes();
-    }
-
-    if (!fs.exists(path.getParent())) {
-      throw new FileNotFoundException(path.toString());
-    }
-
-    if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
-      String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
-      codecFactory = new CompressionCodecFactory(conf);
-      codec = codecFactory.getCodecByClassName(codecName);
-    } else {
-      if (fs.exists(path)) {
-        throw new AlreadyExistsStorageException(path);
-      }
-    }
-
-    try {
-      String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE,
-          TextSerializerDeserializer.class.getName());
-      serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      throw new IOException(e);
-    }
-
-    Class<? extends Writable>  keyClass, valueClass;
-    if (serde instanceof BinarySerializerDeserializer) {
-      keyClass = BytesWritable.class;
-      EMPTY_KEY = new BytesWritable();
-      valueClass = BytesWritable.class;
-    } else {
-      keyClass = LongWritable.class;
-      EMPTY_KEY = new LongWritable();
-      valueClass = Text.class;
-    }
-
-    String type = this.meta.getOption(StorageConstants.COMPRESSION_TYPE, CompressionType.NONE.name());
-    if (type.equals(CompressionType.BLOCK.name())) {
-      writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.BLOCK, codec);
-    } else if (type.equals(CompressionType.RECORD.name())) {
-      writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.RECORD, codec);
-    } else {
-      writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec);
-    }
-
-    if (enabledStats) {
-      this.stats = new TableStatistics(this.schema);
-    }
-
-    super.init();
-  }
-
-  @Override
-  public void addTuple(Tuple tuple) throws IOException {
-    Datum datum;
-
-    if (serde instanceof BinarySerializerDeserializer) {
-      byte nullByte = 0;
-      int lasti = 0;
-      for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-
-        // set bit to 1 if a field is not null
-        if (null != datum) {
-          nullByte |= 1 << (i % 8);
-        }
-
-        // write the null byte every eight elements or
-        // if this is the last element and serialize the
-        // corresponding 8 struct fields at the same time
-        if (7 == i % 8 || i == columnNum - 1) {
-          os.write(nullByte);
-
-          for (int j = lasti; j <= i; j++) {
-            datum = tuple.get(j);
-
-            switch (schema.getColumn(j).getDataType().getType()) {
-              case TEXT:
-                BytesUtils.writeVLong(os, datum.asTextBytes().length);
-                break;
-              case PROTOBUF:
-                ProtobufDatum protobufDatum = (ProtobufDatum) datum;
-                BytesUtils.writeVLong(os, protobufDatum.asByteArray().length);
-                break;
-              case CHAR:
-              case INET4:
-              case BLOB:
-                BytesUtils.writeVLong(os, datum.asByteArray().length);
-                break;
-              default:
-            }
-
-            serde.serialize(schema.getColumn(j), datum, os, nullChars);
-
-            if (isShuffle) {
-              // it is to calculate min/max values, and it is only used for the intermediate file.
-              stats.analyzeField(j, datum);
-            }
-          }
-          lasti = i + 1;
-          nullByte = 0;
-        }
-      }
-
-      BytesWritable b = new BytesWritable();
-      b.set(os.getData(), 0, os.getLength());
-      writer.append(EMPTY_KEY, b);
-
-    } else {
-      for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-        serde.serialize(schema.getColumn(i), datum, os, nullChars);
-
-        if (columnNum -1 > i) {
-          os.write((byte) delimiter);
-        }
-
-        if (isShuffle) {
-          // it is to calculate min/max values, and it is only used for the intermediate file.
-          stats.analyzeField(i, datum);
-        }
-
-      }
-      writer.append(EMPTY_KEY, new Text(os.toByteArray()));
-    }
-
-    os.reset();
-    pos += writer.getLength();
-    rowCount++;
-
-    if (enabledStats) {
-      stats.incrementRow();
-    }
-  }
-
-  @Override
-  public long getOffset() throws IOException {
-    return pos;
-  }
-
-  @Override
-  public void flush() throws IOException {
-    os.flush();
-    writer.close();
-  }
-
-  @Override
-  public void close() throws IOException {
-    // Statistical section
-    if (enabledStats) {
-      stats.setNumBytes(getOffset());
-    }
-
-    os.close();
-    writer.close();
-  }
-
-  @Override
-  public TableStats getStats() {
-    if (enabledStats) {
-      return stats.getTableStat();
-    } else {
-      return null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
deleted file mode 100644
index b0ef67d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.sequencefile;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.IOException;
-
-public class SequenceFileScanner extends FileScanner {
-  private static final Log LOG = LogFactory.getLog(SequenceFileScanner.class);
-
-  private FileSystem fs;
-  private SequenceFile.Reader reader;
-  private SerializerDeserializer serde;
-  private byte[] nullChars;
-  private char delimiter;
-
-  private int currentIdx = 0;
-  private int[] projectionMap;
-
-  private boolean hasBinarySerDe = false;
-  private long totalBytes = 0L;
-
-  private long start, end;
-  private boolean  more = true;
-
-  /**
-   * Whether a field is null or not. Because length is 0 does not means the
-   * field is null. In particular, a 0-length string is not null.
-   */
-  private boolean[] fieldIsNull;
-
-  /**
-   * The start positions and lengths of fields. Only valid when the data is parsed.
-   */
-  private int[] fieldStart;
-  private int[] fieldLength;
-
-  private int elementOffset, elementSize;
-
-  private Writable EMPTY_KEY;
-
-  public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
-    super(conf, schema, meta, fragment);
-  }
-
-  @Override
-  public void init() throws IOException {
-    // FileFragment information
-    if(fs == null) {
-      fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
-    }
-
-    reader = new SequenceFile.Reader(fs, fragment.getPath(), conf);
-
-    String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.SEQUENCEFILE_NULL,
-        NullDatum.DEFAULT_TEXT));
-    if (StringUtils.isEmpty(nullCharacters)) {
-      nullChars = NullDatum.get().asTextBytes();
-    } else {
-      nullChars = nullCharacters.getBytes();
-    }
-
-    String delim  = meta.getOption(StorageConstants.SEQUENCEFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
-    this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
-
-    this.start = fragment.getStartKey();
-    this.end = start + fragment.getEndKey();
-
-    if (fragment.getStartKey() > reader.getPosition())
-      reader.sync(this.start);
-
-    more = start < end;
-
-    if (targets == null) {
-      targets = schema.toArray();
-    }
-
-
-    fieldIsNull = new boolean[schema.getColumns().size()];
-    fieldStart = new int[schema.getColumns().size()];
-    fieldLength = new int[schema.getColumns().size()];
-
-    prepareProjection(targets);
-
-    try {
-      String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
-      serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-
-      if (serde instanceof BinarySerializerDeserializer) {
-        hasBinarySerDe = true;
-      }
-
-      Class<? extends Writable> keyClass = (Class<? extends Writable>)Class.forName(reader.getKeyClassName());
-      EMPTY_KEY = keyClass.newInstance();
-
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      throw new IOException(e);
-    }
-    super.init();
-  }
-
-  public Writable getKey() {
-    return EMPTY_KEY;
-  }
-
-  private void prepareProjection(Column [] targets) {
-    projectionMap = new int[targets.length];
-
-    int tid;
-    for (int i = 0; i < targets.length; i++) {
-      tid = schema.getColumnId(targets[i].getQualifiedName());
-      projectionMap[i] = tid;
-    }
-  }
-
-  @Override
-  public Tuple next() throws IOException {
-    if (!more) return null;
-
-    long pos = reader.getPosition();
-    boolean remaining = reader.next(EMPTY_KEY);
-
-    if (pos >= end && reader.syncSeen()) {
-      more = false;
-    } else {
-      more = remaining;
-    }
-
-    if (more) {
-      Tuple tuple = null;
-      byte[][] cells;
-
-      if (hasBinarySerDe) {
-        BytesWritable bytesWritable = new BytesWritable();
-        reader.getCurrentValue(bytesWritable);
-        tuple = makeTuple(bytesWritable);
-        totalBytes += (long)bytesWritable.getBytes().length;
-      } else {
-        Text text = new Text();
-        reader.getCurrentValue(text);
-        cells = BytesUtils.splitPreserveAllTokens(text.getBytes(), delimiter, projectionMap);
-        totalBytes += (long)text.getBytes().length;
-        tuple = new LazyTuple(schema, cells, 0, nullChars, serde);
-      }
-      currentIdx++;
-      return tuple;
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * In hive, LazyBinarySerDe is serialized as follows: start A B A B A B end bytes[] ->
-   * |-----|---------|--- ... ---|-----|---------|
-   *
-   * Section A is one null-byte, corresponding to eight struct fields in Section
-   * B. Each bit indicates whether the corresponding field is null (0) or not null
-   * (1). Each field is a LazyBinaryObject.
-   *
-   * Following B, there is another section A and B. This pattern repeats until the
-   * all struct fields are serialized.
-   *
-   * So, tajo must make a tuple after parsing hive style BinarySerDe.
-   */
-  private Tuple makeTuple(BytesWritable value) throws IOException{
-    Tuple tuple = new VTuple(schema.getColumns().size());
-
-    int start = 0;
-    int length = value.getLength();
-
-    /**
-     * Please note that one null byte is followed by eight fields, then more
-     * null byte and fields.
-     */
-    int structByteEnd = start + length;
-    byte[] bytes = value.getBytes();
-
-    byte nullByte = bytes[start];
-    int lastFieldByteEnd = start + 1;
-
-    // Go through all bytes in the byte[]
-    for (int i = 0; i < schema.getColumns().size(); i++) {
-      fieldIsNull[i] = true;
-      if ((nullByte & (1 << (i % 8))) != 0) {
-        fieldIsNull[i] = false;
-        parse(schema.getColumn(i), bytes, lastFieldByteEnd);
-
-        fieldStart[i] = lastFieldByteEnd + elementOffset;
-        fieldLength[i] = elementSize;
-        lastFieldByteEnd = fieldStart[i] + fieldLength[i];
-
-        for (int j = 0; j < projectionMap.length; j++) {
-          if (projectionMap[j] == i) {
-            Datum datum = serde.deserialize(schema.getColumn(i), bytes, fieldStart[i], fieldLength[i], nullChars);
-            tuple.put(i, datum);
-          }
-        }
-      }
-
-      // next byte is a null byte if there are more bytes to go
-      if (7 == (i % 8)) {
-        if (lastFieldByteEnd < structByteEnd) {
-          nullByte = bytes[lastFieldByteEnd];
-          lastFieldByteEnd++;
-        } else {
-          // otherwise all null afterwards
-          nullByte = 0;
-          lastFieldByteEnd++;
-        }
-      }
-    }
-
-    return tuple;
-  }
-
-  /**
-   * Check a particular field and set its size and offset in bytes based on the
-   * field type and the bytes arrays.
-   *
-   * For void, boolean, byte, short, int, long, float and double, there is no
-   * offset and the size is fixed. For string, the first four bytes are used to store the size.
-   * So the offset is 4 and the size is computed by concating the first four bytes together.
-   * The first four bytes are defined with respect to the offset in the bytes arrays.
-   *
-   * @param col
-   *          catalog column information
-   * @param bytes
-   *          bytes arrays store the table row
-   * @param offset
-   *          offset of this field
-   */
-  private void parse(Column col, byte[] bytes, int offset) throws
-      IOException {
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-      case BIT:
-        elementOffset = 0;
-        elementSize = 1;
-        break;
-      case INT2:
-        elementOffset = 0;
-        elementSize = 2;
-        break;
-      case INT4:
-      case INT8:
-        elementOffset = 0;
-        elementSize = WritableUtils.decodeVIntSize(bytes[offset]);
-        break;
-      case FLOAT4:
-        elementOffset = 0;
-        elementSize = 4;
-        break;
-      case FLOAT8:
-        elementOffset = 0;
-        elementSize = 8;
-        break;
-      case BLOB:
-      case PROTOBUF:
-      case INET4:
-      case CHAR:
-      case TEXT:
-        elementOffset = 1;
-        elementSize = bytes[offset];
-        break;
-      default:
-        elementOffset = 0;
-        elementSize = 0;
-    }
-  }
-
-  @Override
-  public void reset() throws IOException {
-    if (reader != null) {
-      reader.sync(0);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (reader != null)
-      reader.close();
-
-    if (tableStats != null) {
-      tableStats.setReadBytes(totalBytes);
-      tableStats.setNumRows(currentIdx);
-    }
-  }
-
-  @Override
-  public boolean isProjectable() {
-    return true;
-  }
-
-  @Override
-  public boolean isSelectable() {
-    return true;
-  }
-
-  @Override
-  public boolean isSplittable(){
-    return true;
-  }
-}