You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:59 UTC

[25/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
deleted file mode 100644
index 66c610a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * A class that provides a line reader from an input stream.
- * Depending on the constructor used, lines will either be terminated by:
- * <ul>
- * <li>one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF).</li>
- * <li><em>or</em>, a custom byte sequence delimiter</li>
- * </ul>
- * In both cases, EOF also terminates an otherwise unterminated
- * line.
- */
-
-public class LineReader implements Closeable {
-  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-  private int bufferSize = DEFAULT_BUFFER_SIZE;
-  private InputStream in;
-  private byte[] buffer;
-  // the number of bytes of real data in the buffer
-  private int bufferLength = 0;
-  // the current position in the buffer
-  private int bufferPosn = 0;
-
-  private static final byte CR = '\r';
-  private static final byte LF = '\n';
-
-  // The line delimiter
-  private final byte[] recordDelimiterBytes;
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * default buffer-size (64k).
-   *
-   * @param in The input stream
-   * @throws IOException
-   */
-  public LineReader(InputStream in) {
-    this(in, DEFAULT_BUFFER_SIZE);
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * given buffer-size.
-   *
-   * @param in         The input stream
-   * @param bufferSize Size of the read buffer
-   * @throws IOException
-   */
-  public LineReader(InputStream in, int bufferSize) {
-    this.in = in;
-    this.bufferSize = bufferSize;
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = null;
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * <code>io.file.buffer.size</code> specified in the given
-   * <code>Configuration</code>.
-   *
-   * @param in   input stream
-   * @param conf configuration
-   * @throws IOException
-   */
-  public LineReader(InputStream in, Configuration conf) throws IOException {
-    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * default buffer-size, and using a custom delimiter of array of
-   * bytes.
-   *
-   * @param in                   The input stream
-   * @param recordDelimiterBytes The delimiter
-   */
-  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
-    this.in = in;
-    this.bufferSize = DEFAULT_BUFFER_SIZE;
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = recordDelimiterBytes;
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * given buffer-size, and using a custom delimiter of array of
-   * bytes.
-   *
-   * @param in                   The input stream
-   * @param bufferSize           Size of the read buffer
-   * @param recordDelimiterBytes The delimiter
-   * @throws IOException
-   */
-  public LineReader(InputStream in, int bufferSize,
-                    byte[] recordDelimiterBytes) {
-    this.in = in;
-    this.bufferSize = bufferSize;
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = recordDelimiterBytes;
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * <code>io.file.buffer.size</code> specified in the given
-   * <code>Configuration</code>, and using a custom delimiter of array of
-   * bytes.
-   *
-   * @param in                   input stream
-   * @param conf                 configuration
-   * @param recordDelimiterBytes The delimiter
-   * @throws IOException
-   */
-  public LineReader(InputStream in, Configuration conf,
-                    byte[] recordDelimiterBytes) throws IOException {
-    this.in = in;
-    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = recordDelimiterBytes;
-  }
-
-
-  /**
-   * Close the underlying stream.
-   *
-   * @throws IOException
-   */
-  public void close() throws IOException {
-    in.close();
-  }
-
-  public void reset() {
-    bufferLength = 0;
-    bufferPosn = 0;
-
-  }
-
-  /**
-   * Read one line from the InputStream into the given Text.
-   *
-   * @param str               the object to store the given line (without newline)
-   * @param maxLineLength     the maximum number of bytes to store into str;
-   *                          the rest of the line is silently discarded.
-   * @param maxBytesToConsume the maximum number of bytes to consume
-   *                          in this call.  This is only a hint, because if the line cross
-   *                          this threshold, we allow it to happen.  It can overshoot
-   *                          potentially by as much as one buffer length.
-   * @return the number of bytes read including the (longest) newline
-   *         found.
-   * @throws IOException if the underlying stream throws
-   */
-  public int readLine(Text str, int maxLineLength,
-                      int maxBytesToConsume) throws IOException {
-    if (this.recordDelimiterBytes != null) {
-      return readCustomLine(str, maxLineLength, maxBytesToConsume);
-    } else {
-      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
-    }
-  }
-
-  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
-      throws IOException {
-    return in.read(buffer);
-  }
-  /**
-   * Read a line terminated by one of CR, LF, or CRLF.
-   */
-  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-    /* We're reading data from in, but the head of the stream may be
-     * already buffered in buffer, so we have several cases:
-     * 1. No newline characters are in the buffer, so we need to copy
-     *    everything and read another buffer from the stream.
-     * 2. An unambiguously terminated line is in buffer, so we just
-     *    copy to str.
-     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-     *    in CR.  In this case we copy everything up to CR to str, but
-     *    we also need to see what follows CR: if it's LF, then we
-     *    need consume LF as well, so next call to readLine will read
-     *    from after that.
-     * We use a flag prevCharCR to signal if previous character was CR
-     * and, if it happens to be at the end of the buffer, delay
-     * consuming it until we have a chance to look at the char that
-     * follows.
-     */
-    str.clear();
-    int txtLength = 0; //tracks str.getLength(), as an optimization
-    int newlineLength = 0; //length of terminating newline
-    boolean prevCharCR = false; //true of prev char was CR
-    long bytesConsumed = 0;
-    do {
-      int startPosn = bufferPosn; //starting from where we left off the last time
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        if (prevCharCR) {
-          ++bytesConsumed; //account for CR from previous read
-        }
-        bufferLength = fillBuffer(in, buffer, prevCharCR);
-        if (bufferLength <= 0) {
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-        if (buffer[bufferPosn] == LF) {
-          newlineLength = (prevCharCR) ? 2 : 1;
-          ++bufferPosn; // at next invocation proceed from following byte
-          break;
-        }
-        if (prevCharCR) { //CR + notLF, we are at notLF
-          newlineLength = 1;
-          break;
-        }
-        prevCharCR = (buffer[bufferPosn] == CR);
-      }
-      int readLength = bufferPosn - startPosn;
-      if (prevCharCR && newlineLength == 0) {
-        --readLength; //CR at the end of the buffer
-      }
-      bytesConsumed += readLength;
-      int appendLength = readLength - newlineLength;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-      if (appendLength > 0) {
-        str.append(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
-    if (bytesConsumed > (long) Integer.MAX_VALUE) {
-      throw new IOException("Too many bytes before newline: " + bytesConsumed);
-    }
-    return (int) bytesConsumed;
-  }
-
-  /**
-   * Read a line terminated by one of CR, LF, or CRLF.
-   */
-  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
-      , int maxBytesToConsume)
-      throws IOException {
-    /* We're reading data from in, but the head of the stream may be
-     * already buffered in buffer, so we have several cases:
-     * 1. No newline characters are in the buffer, so we need to copy
-     *    everything and read another buffer from the stream.
-     * 2. An unambiguously terminated line is in buffer, so we just
-     *    copy to str.
-     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-     *    in CR.  In this case we copy everything up to CR to str, but
-     *    we also need to see what follows CR: if it's LF, then we
-     *    need consume LF as well, so next call to readLine will read
-     *    from after that.
-     * We use a flag prevCharCR to signal if previous character was CR
-     * and, if it happens to be at the end of the buffer, delay
-     * consuming it until we have a chance to look at the char that
-     * follows.
-     */
-
-    int txtLength = 0; //tracks str.getLength(), as an optimization
-    int newlineLength = 0; //length of terminating newline
-    boolean prevCharCR = false; //true of prev char was CR
-    long bytesConsumed = 0;
-    do {
-      int startPosn = bufferPosn; //starting from where we left off the last time
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        if (prevCharCR) {
-          ++bytesConsumed; //account for CR from previous read
-        }
-        bufferLength = fillBuffer(in, buffer, prevCharCR);
-        if (bufferLength <= 0) {
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-        if (buffer[bufferPosn] == LF) {
-          newlineLength = (prevCharCR) ? 2 : 1;
-          ++bufferPosn; // at next invocation proceed from following byte
-          break;
-        }
-        if (prevCharCR) { //CR + notLF, we are at notLF
-          newlineLength = 1;
-          break;
-        }
-        prevCharCR = (buffer[bufferPosn] == CR);
-      }
-      int readLength = bufferPosn - startPosn;
-      if (prevCharCR && newlineLength == 0) {
-        --readLength; //CR at the end of the buffer
-      }
-      bytesConsumed += readLength;
-      int appendLength = readLength - newlineLength;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-      if (appendLength > 0) {
-        str.write(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
-    if (bytesConsumed > (long) Integer.MAX_VALUE) {
-      throw new IOException("Too many bytes before newline: " + bytesConsumed);
-    }
-
-    if (bytesConsumed > 0) offsets.add(txtLength);
-    return (int) bytesConsumed;
-  }
-
-  /**
-   * Read a line terminated by one of CR, LF, or CRLF.
-   */
-
-/*  int validIdx = 0;
-  public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
-                             long pos, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-    *//* We're reading data from in, but the head of the stream may be
-     * already buffered in buffer, so we have several cases:
-     * 1. No newline characters are in the buffer, so we need to copy
-     *    everything and read another buffer from the stream.
-     * 2. An unambiguously terminated line is in buffer, so we just
-     *    copy to str.
-     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-     *    in CR.  In this case we copy everything up to CR to str, but
-     *    we also need to see what follows CR: if it's LF, then we
-     *    need consume LF as well, so next call to readLine will read
-     *    from after that.
-     * We use a flag prevCharCR to signal if previous character was CR
-     * and, if it happens to be at the end of the buffer, delay
-     * consuming it until we have a chance to look at the char that
-     * follows.
-     *//*
-    //str.clear();
-    str.reset();
-    offsets.clear();
-    foffsets.clear();
-
-    validIdx = 0;
-    long bufferBytesConsumed = 0;
-
-    int txtLength = 0; //tracks str.getLength(), as an optimization
-    int newlineLength = 0; //length of terminating newline
-    boolean prevCharCR = false; //true of prev char was CR
-    long bytesConsumed = 0;
-    do {
-
-      int startPosn = bufferPosn; //starting from where we left off the last time
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        if (prevCharCR) {
-          ++bytesConsumed; //account for CR from previous read
-        }
-        bufferLength = in.read(buffer);
-        if (bufferLength <= 0) {
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-        if (buffer[bufferPosn] == LF) {
-          newlineLength = (prevCharCR) ? 2 : 1;
-          ++bufferPosn; // at next invocation proceed from following byte
-          break;
-        }
-        if (prevCharCR) { //CR + notLF, we are at notLF
-          newlineLength = 1;
-          break;
-        }
-        prevCharCR = (buffer[bufferPosn] == CR);
-      }
-      int readLength = bufferPosn - startPosn;
-      if (prevCharCR && newlineLength == 0) {
-        --readLength; //CR at the end of the buffer
-      }
-      bytesConsumed += readLength;
-      int appendLength = readLength - newlineLength;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-
-      if (appendLength > 0) {
-        str.write(buffer, startPosn, appendLength);
-        //System.out.println(startPosn + "," + appendLength);
-        //str.append(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-
-      if(newlineLength > 0){
-        validIdx++;
-
-        if (bytesConsumed > (long)Integer.MAX_VALUE) {
-          throw new IOException("Too many bytes before newline: " + bytesConsumed);
-        }
-        offsets.add(txtLength);
-        foffsets.add(pos);
-        pos+= bytesConsumed;
-        bufferBytesConsumed += bytesConsumed;
-
-        txtLength = 0;
-        newlineLength = 0;
-        prevCharCR = false; //true of prev char was CR
-        bytesConsumed = 0;
-      } else {
-        bufferBytesConsumed += bytesConsumed;
-        bytesConsumed = 0;
-      }
-    } while ((bufferBytesConsumed < 256 * 1024));
-
-    return (int)bufferBytesConsumed;
-  }*/
-
-  /**
-   * Read a line terminated by a custom delimiter.
-   */
-  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-   /* We're reading data from inputStream, but the head of the stream may be
-    *  already captured in the previous buffer, so we have several cases:
-    *
-    * 1. The buffer tail does not contain any character sequence which
-    *    matches with the head of delimiter. We count it as a
-    *    ambiguous byte count = 0
-    *
-    * 2. The buffer tail contains a X number of characters,
-    *    that forms a sequence, which matches with the
-    *    head of delimiter. We count ambiguous byte count = X
-    *
-    *    // ***  eg: A segment of input file is as follows
-    *
-    *    " record 1792: I found this bug very interesting and
-    *     I have completely read about it. record 1793: This bug
-    *     can be solved easily record 1794: This ."
-    *
-    *    delimiter = "record";
-    *
-    *    supposing:- String at the end of buffer =
-    *    "I found this bug very interesting and I have completely re"
-    *    There for next buffer = "ad about it. record 179       ...."
-    *
-    *     The matching characters in the input
-    *     buffer tail and delimiter head = "re"
-    *     Therefore, ambiguous byte count = 2 ****   //
-    *
-    *     2.1 If the following bytes are the remaining characters of
-    *         the delimiter, then we have to capture only up to the starting
-    *         position of delimiter. That means, we need not include the
-    *         ambiguous characters in str.
-    *
-    *     2.2 If the following bytes are not the remaining characters of
-    *         the delimiter ( as mentioned in the example ),
-    *         then we have to include the ambiguous characters in str.
-    */
-    str.clear();
-    int txtLength = 0; // tracks str.getLength(), as an optimization
-    long bytesConsumed = 0;
-    int delPosn = 0;
-    int ambiguousByteCount = 0; // To capture the ambiguous characters count
-    do {
-      int startPosn = bufferPosn; // Start from previous end position
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
-        if (bufferLength <= 0) {
-          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) {
-        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
-          delPosn++;
-          if (delPosn >= recordDelimiterBytes.length) {
-            bufferPosn++;
-            break;
-          }
-        } else if (delPosn != 0) {
-          bufferPosn--;
-          delPosn = 0;
-        }
-      }
-      int readLength = bufferPosn - startPosn;
-      bytesConsumed += readLength;
-      int appendLength = readLength - delPosn;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-      if (appendLength > 0) {
-        if (ambiguousByteCount > 0) {
-          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
-          //appending the ambiguous characters (refer case 2.2)
-          bytesConsumed += ambiguousByteCount;
-          ambiguousByteCount = 0;
-        }
-        str.append(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-      if (bufferPosn >= bufferLength) {
-        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
-          ambiguousByteCount = delPosn;
-          bytesConsumed -= ambiguousByteCount; //to be consumed in next
-        }
-      }
-    } while (delPosn < recordDelimiterBytes.length
-        && bytesConsumed < maxBytesToConsume);
-    if (bytesConsumed > (long) Integer.MAX_VALUE) {
-      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
-    }
-    return (int) bytesConsumed;
-  }
-
-  /**
-   * Read from the InputStream into the given Text.
-   *
-   * @param str           the object to store the given line
-   * @param maxLineLength the maximum number of bytes to store into str.
-   * @return the number of bytes read including the newline
-   * @throws IOException if the underlying stream throws
-   */
-  public int readLine(Text str, int maxLineLength) throws IOException {
-    return readLine(str, maxLineLength, Integer.MAX_VALUE);
-  }
-
-  /**
-   * Read from the InputStream into the given Text.
-   *
-   * @param str the object to store the given line
-   * @return the number of bytes read including the newline
-   * @throws IOException if the underlying stream throws
-   */
-  public int readLine(Text str) throws IOException {
-    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
deleted file mode 100644
index f19b61f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.ClassSize;
-
-public class MemoryUtil {
-
-  /** Overhead for an NullDatum */
-  public static final long NULL_DATUM;
-
-  /** Overhead for an BoolDatum */
-  public static final long BOOL_DATUM;
-
-  /** Overhead for an CharDatum */
-  public static final long CHAR_DATUM;
-
-  /** Overhead for an BitDatum */
-  public static final long BIT_DATUM;
-
-  /** Overhead for an Int2Datum */
-  public static final long INT2_DATUM;
-
-  /** Overhead for an Int4Datum */
-  public static final long INT4_DATUM;
-
-  /** Overhead for an Int8Datum */
-  public static final long INT8_DATUM;
-
-  /** Overhead for an Float4Datum */
-  public static final long FLOAT4_DATUM;
-
-  /** Overhead for an Float8Datum */
-  public static final long FLOAT8_DATUM;
-
-  /** Overhead for an TextDatum */
-  public static final long TEXT_DATUM;
-
-  /** Overhead for an BlobDatum */
-  public static final long BLOB_DATUM;
-
-  /** Overhead for an DateDatum */
-  public static final long DATE_DATUM;
-
-  /** Overhead for an TimeDatum */
-  public static final long TIME_DATUM;
-
-  /** Overhead for an TimestampDatum */
-  public static final long TIMESTAMP_DATUM;
-
-  static {
-    NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false);
-
-    CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false);
-
-    BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false);
-
-    BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false);
-
-    INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false);
-
-    INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false);
-
-    INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false);
-
-    FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false);
-
-    FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false);
-
-    TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false);
-
-    BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false);
-
-    DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false);
-
-    TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false);
-
-    TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false);
-  }
-
-  public static long calculateMemorySize(Tuple tuple) {
-    long total = ClassSize.OBJECT;
-    for (Datum datum : tuple.getValues()) {
-      switch (datum.type()) {
-
-      case NULL_TYPE:
-        total += NULL_DATUM;
-        break;
-
-      case BOOLEAN:
-        total += BOOL_DATUM;
-        break;
-
-      case BIT:
-        total += BIT_DATUM;
-        break;
-
-      case CHAR:
-        total += CHAR_DATUM + datum.size();
-        break;
-
-      case INT1:
-      case INT2:
-        total += INT2_DATUM;
-        break;
-
-      case INT4:
-        total += INT4_DATUM;
-        break;
-
-      case INT8:
-        total += INT8_DATUM;
-        break;
-
-      case FLOAT4:
-        total += FLOAT4_DATUM;
-        break;
-
-      case FLOAT8:
-        total += FLOAT4_DATUM;
-        break;
-
-      case TEXT:
-        total += TEXT_DATUM + datum.size();
-        break;
-
-      case DATE:
-        total += DATE_DATUM;
-        break;
-
-      case TIME:
-        total += TIME_DATUM;
-        break;
-
-      case TIMESTAMP:
-        total += TIMESTAMP_DATUM;
-        break;
-
-      default:
-        break;
-      }
-    }
-
-    return total;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
deleted file mode 100644
index 4122c76..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class MergeScanner implements Scanner {
-  private Configuration conf;
-  private TableMeta meta;
-  private Schema schema;
-  private List<FileFragment> fragments;
-  private Iterator<FileFragment> iterator;
-  private FileFragment currentFragment;
-  private Scanner currentScanner;
-  private Tuple tuple;
-  private boolean projectable = false;
-  private boolean selectable = false;
-  private Schema target;
-  private float progress;
-  protected TableStats tableStats;
-
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList)
-      throws IOException {
-    this(conf, schema, meta, rawFragmentList, schema);
-  }
-
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList,
-                      Schema target)
-      throws IOException {
-    this.conf = conf;
-    this.schema = schema;
-    this.meta = meta;
-    this.target = target;
-
-    this.fragments = new ArrayList<FileFragment>();
-
-    long numBytes = 0;
-    for (FileFragment eachFileFragment: rawFragmentList) {
-      numBytes += eachFileFragment.getEndKey();
-      if (eachFileFragment.getEndKey() > 0) {
-        fragments.add(eachFileFragment);
-      }
-    }
-
-    // it should keep the input order. Otherwise, it causes wrong result of sort queries.
-    this.reset();
-
-    if (currentScanner != null) {
-      this.projectable = currentScanner.isProjectable();
-      this.selectable = currentScanner.isSelectable();
-    }
-
-    tableStats = new TableStats();
-
-    tableStats.setNumBytes(numBytes);
-    tableStats.setNumBlocks(fragments.size());
-
-    for(Column eachColumn: schema.getColumns()) {
-      ColumnStats columnStats = new ColumnStats(eachColumn);
-      tableStats.addColumnStat(columnStats);
-    }
-  }
-
-  @Override
-  public void init() throws IOException {
-    progress = 0.0f;
-  }
-
-  @Override
-  public Tuple next() throws IOException {
-    if (currentScanner != null)
-      tuple = currentScanner.next();
-
-    if (tuple != null) {
-      return tuple;
-    } else {
-      if (currentScanner != null) {
-        currentScanner.close();
-        TableStats scannerTableStsts = currentScanner.getInputStats();
-        if (scannerTableStsts != null) {
-          tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes());
-          tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows());
-        }
-      }
-      currentScanner = getNextScanner();
-      if (currentScanner != null) {
-        tuple = currentScanner.next();
-      }
-    }
-    return tuple;
-  }
-
-  @Override
-  public void reset() throws IOException {
-    this.iterator = fragments.iterator();
-    this.currentScanner = getNextScanner();
-  }
-
-  private Scanner getNextScanner() throws IOException {
-    if (iterator.hasNext()) {
-      currentFragment = iterator.next();
-      currentScanner = StorageManager.getStorageManager((TajoConf)conf).getScanner(meta, schema,
-          currentFragment, target);
-      currentScanner.init();
-      return currentScanner;
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if(currentScanner != null) {
-      currentScanner.close();
-      currentScanner = null;
-    }
-    iterator = null;
-    progress = 1.0f;
-  }
-
-  @Override
-  public boolean isProjectable() {
-    return projectable;
-  }
-
-  @Override
-  public void setTarget(Column[] targets) {
-    this.target = new Schema(targets);
-  }
-
-  @Override
-  public boolean isSelectable() {
-    return selectable;
-  }
-
-  @Override
-  public void setSearchCondition(Object expr) {
-  }
-
-  @Override
-  public Schema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public boolean isSplittable(){
-    return false;
-  }
-
-  @Override
-  public float getProgress() {
-    if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) {
-      TableStats scannerTableStsts = currentScanner.getInputStats();
-      long currentScannerReadBytes = 0;
-      if (scannerTableStsts != null) {
-        currentScannerReadBytes = scannerTableStsts.getReadBytes();
-      }
-
-      return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes();
-    } else {
-      return progress;
-    }
-  }
-
-  @Override
-  public TableStats getInputStats() {
-    return tableStats;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
deleted file mode 100644
index 4cec67d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.tajo.storage; /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-public class NullScanner extends FileScanner {
-  public NullScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) {
-    super(conf, schema, meta, fragment);
-  }
-
-  @Override
-  public Tuple next() throws IOException {
-    progress = 1.0f;
-
-    return null;
-  }
-
-  @Override
-  public void reset() throws IOException {
-    progress = 0.0f;
-  }
-
-  @Override
-  public void close() throws IOException {
-    progress = 0.0f;
-  }
-
-  @Override
-  public boolean isProjectable() {
-    return false;
-  }
-
-  @Override
-  public boolean isSelectable() {
-    return true;
-  }
-
-  @Override
-  public boolean isSplittable() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
deleted file mode 100644
index 94d13ee..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-
-import java.util.Comparator;
-
-public class NumericPathComparator implements Comparator<Path> {
-
-  @Override
-  public int compare(Path p1, Path p2) {
-    int num1 = Integer.parseInt(p1.getName());
-    int num2 = Integer.parseInt(p2.getName());
-
-    return num1 - num2;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
deleted file mode 100644
index 2fae243..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ /dev/null
@@ -1,772 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.BitArray;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-public class RawFile {
-  private static final Log LOG = LogFactory.getLog(RawFile.class);
-
-  public static class RawFileScanner extends FileScanner implements SeekableScanner {
-    private FileChannel channel;
-    private DataType[] columnTypes;
-
-    private ByteBuffer buffer;
-    private ByteBuf buf;
-    private Tuple tuple;
-
-    private int headerSize = 0; // Header size of a tuple
-    private BitArray nullFlags;
-    private static final int RECORD_SIZE = 4;
-    private boolean eos = false;
-    private long startOffset;
-    private long endOffset;
-    private FileInputStream fis;
-    private long recordCount;
-    private long totalReadBytes;
-    private long filePosition;
-    private boolean forceFillBuffer;
-
-    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
-      super(conf, schema, meta, fragment);
-    }
-
-    public void init() throws IOException {
-      File file;
-      try {
-        if (fragment.getPath().toUri().getScheme() != null) {
-          file = new File(fragment.getPath().toUri());
-        } else {
-          file = new File(fragment.getPath().toString());
-        }
-      } catch (IllegalArgumentException iae) {
-        throw new IOException(iae);
-      }
-
-      fis = new FileInputStream(file);
-      channel = fis.getChannel();
-      filePosition = startOffset = fragment.getStartKey();
-      endOffset = fragment.getStartKey() + fragment.getEndKey();
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()
-            + ", fragment length :" + fragment.getEndKey());
-      }
-
-      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
-      buffer = buf.nioBuffer(0, buf.capacity());
-
-      columnTypes = new DataType[schema.size()];
-      for (int i = 0; i < schema.size(); i++) {
-        columnTypes[i] = schema.getColumn(i).getDataType();
-      }
-
-      tuple = new VTuple(columnTypes.length);
-      nullFlags = new BitArray(schema.size());
-      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize
-
-      // initial set position
-      if (fragment.getStartKey() > 0) {
-        channel.position(fragment.getStartKey());
-      }
-
-      forceFillBuffer = true;
-      super.init();
-    }
-
-    @Override
-    public long getNextOffset() throws IOException {
-      return filePosition - (forceFillBuffer ? 0 : buffer.remaining());
-    }
-
-    @Override
-    public void seek(long offset) throws IOException {
-      eos = false;
-      filePosition = channel.position();
-
-      // do not fill the buffer if the offset is already included in the buffer.
-      if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){
-        buffer.position((int)(offset - (filePosition - buffer.limit())));
-      } else {
-        if(offset < startOffset || offset > startOffset + fragment.getEndKey()){
-          throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d",
-              startOffset, startOffset + fragment.getEndKey(), offset));
-        }
-        channel.position(offset);
-        filePosition = offset;
-        buffer.clear();
-        forceFillBuffer = true;
-        fillBuffer();
-      }
-    }
-
-    private boolean fillBuffer() throws IOException {
-      if(!forceFillBuffer) buffer.compact();
-
-      int bytesRead = channel.read(buffer);
-      forceFillBuffer = false;
-      if (bytesRead == -1) {
-        eos = true;
-        return false;
-      } else {
-        buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero
-        filePosition += bytesRead;
-        totalReadBytes += bytesRead;
-        return true;
-      }
-    }
-
-    /**
-     * Decode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
-     * into values that can be efficiently encoded with varint.  (Otherwise,
-     * negative values must be sign-extended to 64 bits to be varint encoded,
-     * thus always taking 10 bytes on the wire.)
-     *
-     * @param n An unsigned 32-bit integer, stored in a signed int because
-     *          Java has no explicit unsigned support.
-     * @return A signed 32-bit integer.
-     */
-    public static int decodeZigZag32(final int n) {
-      return (n >>> 1) ^ -(n & 1);
-    }
-
-    /**
-     * Decode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
-     * into values that can be efficiently encoded with varint.  (Otherwise,
-     * negative values must be sign-extended to 64 bits to be varint encoded,
-     * thus always taking 10 bytes on the wire.)
-     *
-     * @param n An unsigned 64-bit integer, stored in a signed int because
-     *          Java has no explicit unsigned support.
-     * @return A signed 64-bit integer.
-     */
-    public static long decodeZigZag64(final long n) {
-      return (n >>> 1) ^ -(n & 1);
-    }
-
-
-    /**
-     * Read a raw Varint from the stream.  If larger than 32 bits, discard the
-     * upper bits.
-     */
-    public int readRawVarint32() throws IOException {
-      byte tmp = buffer.get();
-      if (tmp >= 0) {
-        return tmp;
-      }
-      int result = tmp & 0x7f;
-      if ((tmp = buffer.get()) >= 0) {
-        result |= tmp << 7;
-      } else {
-        result |= (tmp & 0x7f) << 7;
-        if ((tmp = buffer.get()) >= 0) {
-          result |= tmp << 14;
-        } else {
-          result |= (tmp & 0x7f) << 14;
-          if ((tmp = buffer.get()) >= 0) {
-            result |= tmp << 21;
-          } else {
-            result |= (tmp & 0x7f) << 21;
-            result |= (tmp = buffer.get()) << 28;
-            if (tmp < 0) {
-              // Discard upper 32 bits.
-              for (int i = 0; i < 5; i++) {
-                if (buffer.get() >= 0) {
-                  return result;
-                }
-              }
-              throw new IOException("Invalid Variable int32");
-            }
-          }
-        }
-      }
-      return result;
-    }
-
-    /** Read a raw Varint from the stream. */
-    public long readRawVarint64() throws IOException {
-      int shift = 0;
-      long result = 0;
-      while (shift < 64) {
-        final byte b = buffer.get();
-        result |= (long)(b & 0x7F) << shift;
-        if ((b & 0x80) == 0) {
-          return result;
-        }
-        shift += 7;
-      }
-      throw new IOException("Invalid Variable int64");
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      if(eos) return null;
-
-      if (forceFillBuffer || buffer.remaining() < headerSize) {
-        if (!fillBuffer()) {
-          return null;
-        }
-      }
-
-      // backup the buffer state
-      int bufferLimit = buffer.limit();
-      int recordSize = buffer.getInt();
-      int nullFlagSize = buffer.getShort();
-
-      buffer.limit(buffer.position() + nullFlagSize);
-      nullFlags.fromByteBuffer(buffer);
-      // restore the start of record contents
-      buffer.limit(bufferLimit);
-      if (buffer.remaining() < (recordSize - headerSize)) {
-
-        //if the buffer reaches the writable size, the buffer increase the record size
-        reSizeBuffer(recordSize);
-
-        if (!fillBuffer()) {
-          return null;
-        }
-      }
-
-      for (int i = 0; i < columnTypes.length; i++) {
-        // check if the i'th column is null
-        if (nullFlags.get(i)) {
-          tuple.put(i, DatumFactory.createNullDatum());
-          continue;
-        }
-
-        switch (columnTypes[i].getType()) {
-          case BOOLEAN :
-            tuple.put(i, DatumFactory.createBool(buffer.get()));
-            break;
-
-          case BIT :
-            tuple.put(i, DatumFactory.createBit(buffer.get()));
-            break;
-
-          case CHAR :
-            int realLen = readRawVarint32();
-            byte[] buf = new byte[realLen];
-            buffer.get(buf);
-            tuple.put(i, DatumFactory.createChar(buf));
-            break;
-
-          case INT2 :
-            tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
-            break;
-
-          case INT4 :
-            tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
-            break;
-
-          case INT8 :
-            tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
-            break;
-
-          case FLOAT4 :
-            tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
-            break;
-
-          case FLOAT8 :
-            tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
-            break;
-
-          case TEXT : {
-            int len = readRawVarint32();
-            byte [] strBytes = new byte[len];
-            buffer.get(strBytes);
-            tuple.put(i, DatumFactory.createText(strBytes));
-            break;
-          }
-
-          case BLOB : {
-            int len = readRawVarint32();
-            byte [] rawBytes = new byte[len];
-            buffer.get(rawBytes);
-            tuple.put(i, DatumFactory.createBlob(rawBytes));
-            break;
-          }
-
-          case PROTOBUF: {
-            int len = readRawVarint32();
-            byte [] rawBytes = new byte[len];
-            buffer.get(rawBytes);
-
-            ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
-            Message.Builder builder = factory.newBuilder();
-            builder.mergeFrom(rawBytes);
-            tuple.put(i, factory.createDatum(builder.build()));
-            break;
-          }
-
-          case INET4 :
-            byte [] ipv4Bytes = new byte[4];
-            buffer.get(ipv4Bytes);
-            tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
-            break;
-
-          case DATE: {
-            int val = buffer.getInt();
-            if (val < Integer.MIN_VALUE + 1) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
-            }
-            break;
-          }
-          case TIME:
-          case TIMESTAMP: {
-            long val = buffer.getLong();
-            if (val < Long.MIN_VALUE + 1) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
-            }
-            break;
-          }
-          case NULL_TYPE:
-            tuple.put(i, NullDatum.get());
-            break;
-
-          default:
-        }
-      }
-
-      recordCount++;
-
-      if(filePosition - buffer.remaining() >= endOffset){
-        eos = true;
-      }
-      return new VTuple(tuple);
-    }
-
-    private void reSizeBuffer(int writableBytes){
-      if (buffer.capacity() - buffer.remaining()  <  writableBytes) {
-        buf.setIndex(buffer.position(), buffer.limit());
-        buf.markReaderIndex();
-        buf.discardSomeReadBytes();
-        buf.ensureWritable(writableBytes);
-        buffer = buf.nioBuffer(0, buf.capacity());
-        buffer.limit(buf.writerIndex());
-      }
-    }
-
-    @Override
-    public void reset() throws IOException {
-      // reset the buffer
-      buffer.clear();
-      forceFillBuffer = true;
-      filePosition = fragment.getStartKey();
-      channel.position(filePosition);
-      eos = false;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if(buf != null){
-        buffer.clear();
-        buffer = null;
-
-        buf.release();
-        buf = null;
-      }
-
-      IOUtils.cleanup(LOG, channel, fis);
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSplittable(){
-      return false;
-    }
-
-    @Override
-    public TableStats getInputStats() {
-      if(tableStats != null){
-        tableStats.setNumRows(recordCount);
-        tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n)
-        tableStats.setNumBytes(fragment.getEndKey());
-      }
-      return tableStats;
-    }
-
-    @Override
-    public float getProgress() {
-      if(eos) {
-        return 1.0f;
-      }
-
-      if (filePosition - startOffset == 0) {
-        return 0.0f;
-      } else {
-        return Math.min(1.0f, ((float) filePosition / endOffset));
-      }
-    }
-  }
-
-  public static class RawFileAppender extends FileAppender {
-    private FileChannel channel;
-    private RandomAccessFile randomAccessFile;
-    private DataType[] columnTypes;
-
-    private ByteBuffer buffer;
-    private ByteBuf buf;
-    private BitArray nullFlags;
-    private int headerSize = 0;
-    private static final int RECORD_SIZE = 4;
-    private long pos;
-
-    private TableStatistics stats;
-
-    public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
-      super(conf, schema, meta, path);
-    }
-
-    public void init() throws IOException {
-      File file;
-      try {
-        if (path.toUri().getScheme() != null) {
-          file = new File(path.toUri());
-        } else {
-          file = new File(path.toString());
-        }
-      } catch (IllegalArgumentException iae) {
-        throw new IOException(iae);
-      }
-
-      randomAccessFile = new RandomAccessFile(file, "rw");
-      channel = randomAccessFile.getChannel();
-      pos = 0;
-
-      columnTypes = new DataType[schema.size()];
-      for (int i = 0; i < schema.size(); i++) {
-        columnTypes[i] = schema.getColumn(i).getDataType();
-      }
-
-      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
-      buffer = buf.nioBuffer(0, buf.capacity());
-
-      // comput the number of bytes, representing the null flags
-
-      nullFlags = new BitArray(schema.size());
-      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-
-      super.init();
-    }
-
-    @Override
-    public long getOffset() throws IOException {
-      return pos;
-    }
-
-    private void flushBuffer() throws IOException {
-      buffer.flip();
-      channel.write(buffer);
-      buffer.clear();
-    }
-
-    private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
-        throws IOException {
-
-      // if the buffer reaches the limit,
-      // write the bytes from 0 to the previous record.
-      if (buffer.remaining() < sizeToBeWritten) {
-
-        int limit = buffer.position();
-        buffer.limit(recordOffset);
-        buffer.flip();
-        channel.write(buffer);
-        buffer.position(recordOffset);
-        buffer.limit(limit);
-        buffer.compact();
-
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    /**
-     * Encode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
-     * into values that can be efficiently encoded with varint.  (Otherwise,
-     * negative values must be sign-extended to 64 bits to be varint encoded,
-     * thus always taking 10 bytes on the wire.)
-     *
-     * @param n A signed 32-bit integer.
-     * @return An unsigned 32-bit integer, stored in a signed int because
-     *         Java has no explicit unsigned support.
-     */
-    public static int encodeZigZag32(final int n) {
-      // Note:  the right-shift must be arithmetic
-      return (n << 1) ^ (n >> 31);
-    }
-
-    /**
-     * Encode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
-     * into values that can be efficiently encoded with varint.  (Otherwise,
-     * negative values must be sign-extended to 64 bits to be varint encoded,
-     * thus always taking 10 bytes on the wire.)
-     *
-     * @param n A signed 64-bit integer.
-     * @return An unsigned 64-bit integer, stored in a signed int because
-     *         Java has no explicit unsigned support.
-     */
-    public static long encodeZigZag64(final long n) {
-      // Note:  the right-shift must be arithmetic
-      return (n << 1) ^ (n >> 63);
-    }
-
-    /**
-     * Encode and write a varint.  {@code value} is treated as
-     * unsigned, so it won't be sign-extended if negative.
-     */
-    public void writeRawVarint32(int value) throws IOException {
-      while (true) {
-        if ((value & ~0x7F) == 0) {
-          buffer.put((byte) value);
-          return;
-        } else {
-          buffer.put((byte) ((value & 0x7F) | 0x80));
-          value >>>= 7;
-        }
-      }
-    }
-
-    /**
-     * Compute the number of bytes that would be needed to encode a varint.
-     * {@code value} is treated as unsigned, so it won't be sign-extended if
-     * negative.
-     */
-    public static int computeRawVarint32Size(final int value) {
-      if ((value & (0xffffffff <<  7)) == 0) return 1;
-      if ((value & (0xffffffff << 14)) == 0) return 2;
-      if ((value & (0xffffffff << 21)) == 0) return 3;
-      if ((value & (0xffffffff << 28)) == 0) return 4;
-      return 5;
-    }
-
-    /** Encode and write a varint. */
-    public void writeRawVarint64(long value) throws IOException {
-      while (true) {
-        if ((value & ~0x7FL) == 0) {
-          buffer.put((byte) value);
-          return;
-        } else {
-          buffer.put((byte) ((value & 0x7F) | 0x80));
-          value >>>= 7;
-        }
-      }
-    }
-
-    @Override
-    public void addTuple(Tuple t) throws IOException {
-
-      if (buffer.remaining() < headerSize) {
-        flushBuffer();
-      }
-
-      // skip the row header
-      int recordOffset = buffer.position();
-      buffer.position(recordOffset + headerSize);
-      // reset the null flags
-      nullFlags.clear();
-      for (int i = 0; i < schema.size(); i++) {
-        if (enabledStats) {
-          stats.analyzeField(i, t.get(i));
-        }
-
-        if (t.isNull(i)) {
-          nullFlags.set(i);
-          continue;
-        }
-
-        // 8 is the maximum bytes size of all types
-        if (flushBufferAndReplace(recordOffset, 8)) {
-          recordOffset = 0;
-        }
-
-        switch(columnTypes[i].getType()) {
-          case NULL_TYPE:
-            nullFlags.set(i);
-            continue;
-
-          case BOOLEAN:
-          case BIT:
-            buffer.put(t.getByte(i));
-            break;
-
-          case INT2 :
-            buffer.putShort(t.getInt2(i));
-            break;
-
-          case INT4 :
-            writeRawVarint32(encodeZigZag32(t.getInt4(i)));
-            break;
-
-          case INT8 :
-            writeRawVarint64(encodeZigZag64(t.getInt8(i)));
-            break;
-
-          case FLOAT4 :
-            buffer.putFloat(t.getFloat4(i));
-            break;
-
-          case FLOAT8 :
-            buffer.putDouble(t.getFloat8(i));
-            break;
-
-          case CHAR:
-          case TEXT: {
-            byte [] strBytes = t.getBytes(i);
-            if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) {
-              recordOffset = 0;
-            }
-            writeRawVarint32(strBytes.length);
-            buffer.put(strBytes);
-            break;
-          }
-
-        case DATE:
-          buffer.putInt(t.getInt4(i));
-          break;
-
-        case TIME:
-        case TIMESTAMP:
-          buffer.putLong(t.getInt8(i));
-          break;
-
-          case BLOB : {
-            byte [] rawBytes = t.getBytes(i);
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
-              recordOffset = 0;
-            }
-            writeRawVarint32(rawBytes.length);
-            buffer.put(rawBytes);
-            break;
-          }
-
-          case PROTOBUF: {
-            byte [] rawBytes = t.getBytes(i);
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
-              recordOffset = 0;
-            }
-            writeRawVarint32(rawBytes.length);
-            buffer.put(rawBytes);
-            break;
-          }
-
-          case INET4 :
-            buffer.put(t.getBytes(i));
-            break;
-
-          default:
-            throw new IOException("Cannot support data type: " + columnTypes[i].getType());
-        }
-      }
-
-      // write a record header
-      int bufferPos = buffer.position();
-      buffer.position(recordOffset);
-      buffer.putInt(bufferPos - recordOffset);
-      byte [] flags = nullFlags.toArray();
-      buffer.putShort((short) flags.length);
-      buffer.put(flags);
-
-      pos += bufferPos - recordOffset;
-      buffer.position(bufferPos);
-
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    @Override
-    public void flush() throws IOException {
-      if(buffer != null){
-        flushBuffer();
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flush();
-      if (enabledStats) {
-        stats.setNumBytes(getOffset());
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
-      }
-
-      if(buf != null){
-        buffer.clear();
-        buffer = null;
-
-        buf.release();
-        buf = null;
-      }
-
-      IOUtils.cleanup(LOG, channel, randomAccessFile);
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        stats.setNumBytes(pos);
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
deleted file mode 100644
index db36771..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BitArray;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-
-public class RowFile {
-  public static final Log LOG = LogFactory.getLog(RowFile.class);
-
-  private static final int SYNC_ESCAPE = -1;
-  private static final int SYNC_HASH_SIZE = 16;
-  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
-  private final static int DEFAULT_BUFFER_SIZE = 65535;
-  public static int SYNC_INTERVAL;
-
-  public static class RowFileScanner extends FileScanner {
-    private FileSystem fs;
-    private FSDataInputStream in;
-    private Tuple tuple;
-
-    private byte[] sync = new byte[SYNC_HASH_SIZE];
-    private byte[] checkSync = new byte[SYNC_HASH_SIZE];
-    private long start, end;
-
-    private ByteBuffer buffer;
-    private final int tupleHeaderSize;
-    private BitArray nullFlags;
-    private long bufferStartPos;
-
-    public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
-        throws IOException {
-      super(conf, schema, meta, fragment);
-
-      SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
-          ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE;
-
-      nullFlags = new BitArray(schema.size());
-      tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
-      this.start = fragment.getStartKey();
-      this.end = this.start + fragment.getEndKey();
-    }
-
-    public void init() throws IOException {
-      // set default page size.
-      fs = fragment.getPath().getFileSystem(conf);
-      in = fs.open(fragment.getPath());
-      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size());
-      buffer.flip();
-
-      readHeader();
-
-      // find the correct position from the start
-      if (this.start > in.getPos()) {
-        long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
-        in.seek(realStart);
-      }
-      bufferStartPos = in.getPos();
-      fillBuffer();
-
-      if (start != 0) {
-        // TODO: improve
-        boolean syncFound = false;
-        while (!syncFound) {
-          if (buffer.remaining() < SYNC_SIZE) {
-            fillBuffer();
-          }
-          buffer.mark();
-          syncFound = checkSync();
-          if (!syncFound) {
-            buffer.reset();
-            buffer.get(); // proceed one byte
-          }
-        }
-        bufferStartPos += buffer.position();
-        buffer.compact();
-        buffer.flip();
-      }
-
-      super.init();
-    }
-
-    private void readHeader() throws IOException {
-      SYNC_INTERVAL = in.readInt();
-      StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
-    }
-
-    /**
-     * Find the sync from the front of the buffer
-     *
-     * @return return true if it succeeds to find the sync.
-     * @throws IOException
-     */
-    private boolean checkSync() throws IOException {
-      buffer.getInt();                           // escape
-      buffer.get(checkSync, 0, SYNC_HASH_SIZE);  // sync
-      return Arrays.equals(checkSync, sync);
-    }
-
-    private int fillBuffer() throws IOException {
-      bufferStartPos += buffer.position();
-      buffer.compact();
-      int remain = buffer.remaining();
-      int read = in.read(buffer);
-      if (read == -1) {
-        buffer.flip();
-        return read;
-      } else {
-        int totalRead = read;
-        if (remain > totalRead) {
-          read = in.read(buffer);
-          totalRead += read > 0 ? read : 0;
-        }
-        buffer.flip();
-        return totalRead;
-      }
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      while (buffer.remaining() < SYNC_SIZE) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      buffer.mark();
-      if (!checkSync()) {
-        buffer.reset();
-      } else {
-        if (bufferStartPos + buffer.position() > end) {
-          return null;
-        }
-      }
-
-      while (buffer.remaining() < tupleHeaderSize) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      int i;
-      tuple = new VTuple(schema.size());
-
-      int nullFlagSize = buffer.getShort();
-      byte[] nullFlagBytes = new byte[nullFlagSize];
-      buffer.get(nullFlagBytes, 0, nullFlagSize);
-      nullFlags = new BitArray(nullFlagBytes);
-      int tupleSize = buffer.getShort();
-
-      while (buffer.remaining() < (tupleSize)) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      Datum datum;
-      Column col;
-      for (i = 0; i < schema.size(); i++) {
-        if (!nullFlags.get(i)) {
-          col = schema.getColumn(i);
-          switch (col.getDataType().getType()) {
-            case BOOLEAN :
-              datum = DatumFactory.createBool(buffer.get());
-              tuple.put(i, datum);
-              break;
-
-            case BIT:
-              datum = DatumFactory.createBit(buffer.get());
-              tuple.put(i, datum );
-              break;
-
-            case CHAR :
-              int realLen = buffer.getInt();
-              byte[] buf = new byte[col.getDataType().getLength()];
-              buffer.get(buf);
-              byte[] charBuf = Arrays.copyOf(buf, realLen);
-              tuple.put(i, DatumFactory.createChar(charBuf));
-              break;
-
-            case INT2 :
-              datum = DatumFactory.createInt2(buffer.getShort());
-              tuple.put(i, datum );
-              break;
-
-            case INT4 :
-              datum = DatumFactory.createInt4(buffer.getInt());
-              tuple.put(i, datum );
-              break;
-
-            case INT8 :
-              datum = DatumFactory.createInt8(buffer.getLong());
-              tuple.put(i, datum );
-              break;
-
-            case FLOAT4 :
-              datum = DatumFactory.createFloat4(buffer.getFloat());
-              tuple.put(i, datum);
-              break;
-
-            case FLOAT8 :
-              datum = DatumFactory.createFloat8(buffer.getDouble());
-              tuple.put(i, datum);
-              break;
-
-            case TEXT:
-              short bytelen = buffer.getShort();
-              byte[] strbytes = new byte[bytelen];
-              buffer.get(strbytes, 0, bytelen);
-              datum = DatumFactory.createText(strbytes);
-              tuple.put(i, datum);
-              break;
-
-            case BLOB:
-              short bytesLen = buffer.getShort();
-              byte [] bytesBuf = new byte[bytesLen];
-              buffer.get(bytesBuf);
-              datum = DatumFactory.createBlob(bytesBuf);
-              tuple.put(i, datum);
-              break;
-
-            case INET4 :
-              byte[] ipv4 = new byte[4];
-              buffer.get(ipv4, 0, 4);
-              datum = DatumFactory.createInet4(ipv4);
-              tuple.put(i, datum);
-              break;
-
-            default:
-              break;
-          }
-        } else {
-          tuple.put(i, DatumFactory.createNullDatum());
-        }
-      }
-      return tuple;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (in != null) {
-        in.close();
-      }
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSplittable(){
-      return true;
-    }
-  }
-
-  public static class RowFileAppender extends FileAppender {
-    private FSDataOutputStream out;
-    private long lastSyncPos;
-    private FileSystem fs;
-    private byte[] sync;
-    private ByteBuffer buffer;
-
-    private BitArray nullFlags;
-    // statistics
-    private TableStatistics stats;
-
-    public RowFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
-        throws IOException {
-      super(conf, schema, meta, path);
-    }
-
-    public void init() throws IOException {
-      SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
-          ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal);
-      fs = path.getFileSystem(conf);
-
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
-      if (fs.exists(path)) {
-        throw new AlreadyExistsStorageException(path);
-      }
-
-      sync = new byte[SYNC_HASH_SIZE];
-      lastSyncPos = 0;
-
-      out = fs.create(path);
-
-      MessageDigest md;
-      try {
-        md = MessageDigest.getInstance("MD5");
-        md.update((path.toString()+System.currentTimeMillis()).getBytes());
-        sync = md.digest();
-      } catch (NoSuchAlgorithmException e) {
-        LOG.error(e);
-      }
-
-      writeHeader();
-
-      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-
-      nullFlags = new BitArray(schema.size());
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-    }
-
-    private void writeHeader() throws IOException {
-      out.writeInt(SYNC_INTERVAL);
-      out.write(sync);
-      out.flush();
-      lastSyncPos = out.getPos();
-    }
-
-    @Override
-    public void addTuple(Tuple t) throws IOException {
-      checkAndWriteSync();
-      Column col;
-
-      buffer.clear();
-      nullFlags.clear();
-
-      for (int i = 0; i < schema.size(); i++) {
-        if (enabledStats) {
-          stats.analyzeField(i, t.get(i));
-        }
-
-        if (t.isNull(i)) {
-          nullFlags.set(i);
-        } else {
-          col = schema.getColumn(i);
-          switch (col.getDataType().getType()) {
-            case BOOLEAN:
-              buffer.put(t.get(i).asByte());
-              break;
-            case BIT:
-              buffer.put(t.get(i).asByte());
-              break;
-            case CHAR:
-              byte[] src = t.get(i).asByteArray();
-              byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
-              buffer.putInt(src.length);
-              buffer.put(dst);
-              break;
-            case TEXT:
-              byte [] strbytes = t.get(i).asByteArray();
-              buffer.putShort((short)strbytes.length);
-              buffer.put(strbytes, 0, strbytes.length);
-              break;
-            case INT2:
-              buffer.putShort(t.get(i).asInt2());
-              break;
-            case INT4:
-              buffer.putInt(t.get(i).asInt4());
-              break;
-            case INT8:
-              buffer.putLong(t.get(i).asInt8());
-              break;
-            case FLOAT4:
-              buffer.putFloat(t.get(i).asFloat4());
-              break;
-            case FLOAT8:
-              buffer.putDouble(t.get(i).asFloat8());
-              break;
-            case BLOB:
-              byte [] bytes = t.get(i).asByteArray();
-              buffer.putShort((short)bytes.length);
-              buffer.put(bytes);
-              break;
-            case INET4:
-              buffer.put(t.get(i).asByteArray());
-              break;
-            case INET6:
-              buffer.put(t.get(i).asByteArray());
-              break;
-            case NULL_TYPE:
-              nullFlags.set(i);
-              break;
-            default:
-              break;
-          }
-        }
-      }
-
-      byte[] bytes = nullFlags.toArray();
-      out.writeShort(bytes.length);
-      out.write(bytes);
-
-      bytes = buffer.array();
-      int dataLen = buffer.position();
-      out.writeShort(dataLen);
-      out.write(bytes, 0, dataLen);
-
-      // Statistical section
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    @Override
-    public long getOffset() throws IOException {
-      return out.getPos();
-    }
-
-    @Override
-    public void flush() throws IOException {
-      out.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (out != null) {
-        if (enabledStats) {
-          stats.setNumBytes(out.getPos());
-        }
-        sync();
-        out.flush();
-        out.close();
-      }
-    }
-
-    private void sync() throws IOException {
-      if (lastSyncPos != out.getPos()) {
-        out.writeInt(SYNC_ESCAPE);
-        out.write(sync);
-        lastSyncPos = out.getPos();
-      }
-    }
-
-    private void checkAndWriteSync() throws IOException {
-      if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
-        sync();
-      }
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
deleted file mode 100644
index 24b6280..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.exception.UnknownDataTypeException;
-import org.apache.tajo.tuple.offheap.RowWriter;
-import org.apache.tajo.util.BitArray;
-
-import java.nio.ByteBuffer;
-
-public class RowStoreUtil {
-  public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
-    int[] targetIds = new int[outSchema.size()];
-    int i = 0;
-    for (Column target : outSchema.getColumns()) {
-      targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
-      i++;
-    }
-
-    return targetIds;
-  }
-
-  public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
-    out.clear();
-    for (int idx = 0; idx < targetIds.length; idx++) {
-      out.put(idx, in.get(targetIds[idx]));
-    }
-    return out;
-  }
-
-  public static RowStoreEncoder createEncoder(Schema schema) {
-    return new RowStoreEncoder(schema);
-  }
-
-  public static RowStoreDecoder createDecoder(Schema schema) {
-    return new RowStoreDecoder(schema);
-  }
-
-  public static class RowStoreDecoder {
-
-    private Schema schema;
-    private BitArray nullFlags;
-    private int headerSize;
-
-    private RowStoreDecoder(Schema schema) {
-      this.schema = schema;
-      nullFlags = new BitArray(schema.size());
-      headerSize = nullFlags.bytesLength();
-    }
-
-
-    public Tuple toTuple(byte [] bytes) {
-      nullFlags.clear();
-      ByteBuffer bb = ByteBuffer.wrap(bytes);
-      Tuple tuple = new VTuple(schema.size());
-      Column col;
-      TajoDataTypes.DataType type;
-
-      bb.limit(headerSize);
-      nullFlags.fromByteBuffer(bb);
-      bb.limit(bytes.length);
-
-      for (int i =0; i < schema.size(); i++) {
-        if (nullFlags.get(i)) {
-          tuple.put(i, DatumFactory.createNullDatum());
-          continue;
-        }
-
-        col = schema.getColumn(i);
-        type = col.getDataType();
-        switch (type.getType()) {
-          case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
-          case BIT:
-            byte b = bb.get();
-            tuple.put(i, DatumFactory.createBit(b));
-            break;
-
-          case CHAR:
-            byte c = bb.get();
-            tuple.put(i, DatumFactory.createChar(c));
-            break;
-
-          case INT2:
-            short s = bb.getShort();
-            tuple.put(i, DatumFactory.createInt2(s));
-            break;
-
-          case INT4:
-          case DATE:
-            int i_ = bb.getInt();
-            tuple.put(i, DatumFactory.createFromInt4(type, i_));
-            break;
-
-          case INT8:
-          case TIME:
-          case TIMESTAMP:
-            long l = bb.getLong();
-            tuple.put(i, DatumFactory.createFromInt8(type, l));
-            break;
-
-        case INTERVAL:
-            int month  = bb.getInt();
-            long milliseconds  = bb.getLong();
-            tuple.put(i, new IntervalDatum(month, milliseconds));
-            break;
-
-          case FLOAT4:
-            float f = bb.getFloat();
-            tuple.put(i, DatumFactory.createFloat4(f));
-            break;
-
-          case FLOAT8:
-            double d = bb.getDouble();
-            tuple.put(i, DatumFactory.createFloat8(d));
-            break;
-
-          case TEXT:
-            byte [] _string = new byte[bb.getInt()];
-            bb.get(_string);
-            tuple.put(i, DatumFactory.createText(_string));
-            break;
-
-          case BLOB:
-            byte [] _bytes = new byte[bb.getInt()];
-            bb.get(_bytes);
-            tuple.put(i, DatumFactory.createBlob(_bytes));
-            break;
-
-          case INET4:
-            byte [] _ipv4 = new byte[4];
-            bb.get(_ipv4);
-            tuple.put(i, DatumFactory.createInet4(_ipv4));
-            break;
-          case INET6:
-            // TODO - to be implemented
-            throw new UnsupportedException(type.getType().name());
-          default:
-            throw new RuntimeException(new UnknownDataTypeException(type.getType().name()));
-        }
-      }
-      return tuple;
-    }
-
-    public Schema getSchema() {
-      return schema;
-    }
-  }
-
-  public static class RowStoreEncoder {
-    private Schema schema;
-    private BitArray nullFlags;
-    private int headerSize;
-
-    private RowStoreEncoder(Schema schema) {
-      this.schema = schema;
-      nullFlags = new BitArray(schema.size());
-      headerSize = nullFlags.bytesLength();
-    }
-
-    public byte[] toBytes(Tuple tuple) {
-      nullFlags.clear();
-      int size = estimateTupleDataSize(tuple);
-      ByteBuffer bb = ByteBuffer.allocate(size + headerSize);
-      bb.position(headerSize);
-      Column col;
-      for (int i = 0; i < schema.size(); i++) {
-        if (tuple.isNull(i)) {
-          nullFlags.set(i);
-          continue;
-        }
-
-        col = schema.getColumn(i);
-        switch (col.getDataType().getType()) {
-        case NULL_TYPE:
-          nullFlags.set(i);
-          break;
-        case BOOLEAN:
-          bb.put(tuple.get(i).asByte());
-          break;
-        case BIT:
-          bb.put(tuple.get(i).asByte());
-          break;
-        case CHAR:
-          bb.put(tuple.get(i).asByte());
-          break;
-        case INT2:
-          bb.putShort(tuple.get(i).asInt2());
-          break;
-        case INT4:
-          bb.putInt(tuple.get(i).asInt4());
-          break;
-        case INT8:
-          bb.putLong(tuple.get(i).asInt8());
-          break;
-        case FLOAT4:
-          bb.putFloat(tuple.get(i).asFloat4());
-          break;
-        case FLOAT8:
-          bb.putDouble(tuple.get(i).asFloat8());
-          break;
-        case TEXT:
-          byte[] _string = tuple.get(i).asByteArray();
-          bb.putInt(_string.length);
-          bb.put(_string);
-          break;
-        case DATE:
-          bb.putInt(tuple.get(i).asInt4());
-          break;
-        case TIME:
-        case TIMESTAMP:
-          bb.putLong(tuple.get(i).asInt8());
-          break;
-        case INTERVAL:
-          IntervalDatum interval = (IntervalDatum) tuple.get(i);
-          bb.putInt(interval.getMonths());
-          bb.putLong(interval.getMilliSeconds());
-          break;
-        case BLOB:
-          byte[] bytes = tuple.get(i).asByteArray();
-          bb.putInt(bytes.length);
-          bb.put(bytes);
-          break;
-        case INET4:
-          byte[] ipBytes = tuple.get(i).asByteArray();
-          bb.put(ipBytes);
-          break;
-        case INET6:
-          bb.put(tuple.get(i).asByteArray());
-          break;
-        default:
-          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
-        }
-      }
-
-      byte[] flags = nullFlags.toArray();
-      int finalPosition = bb.position();
-      bb.position(0);
-      bb.put(flags);
-
-      bb.position(finalPosition);
-      bb.flip();
-      byte[] buf = new byte[bb.limit()];
-      bb.get(buf);
-      return buf;
-    }
-
-    // Note that, NULL values are treated separately
-    private int estimateTupleDataSize(Tuple tuple) {
-      int size = 0;
-      Column col;
-
-      for (int i = 0; i < schema.size(); i++) {
-        if (tuple.isNull(i)) {
-          continue;
-        }
-
-        col = schema.getColumn(i);
-        switch (col.getDataType().getType()) {
-        case BOOLEAN:
-        case BIT:
-        case CHAR:
-          size += 1;
-          break;
-        case INT2:
-          size += 2;
-          break;
-        case DATE:
-        case INT4:
-        case FLOAT4:
-          size += 4;
-          break;
-        case TIME:
-        case TIMESTAMP:
-        case INT8:
-        case FLOAT8:
-          size += 8;
-          break;
-        case INTERVAL:
-          size += 12;
-          break;
-        case TEXT:
-        case BLOB:
-          size += (4 + tuple.get(i).asByteArray().length);
-          break;
-        case INET4:
-        case INET6:
-          size += tuple.get(i).asByteArray().length;
-          break;
-        default:
-          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
-        }
-      }
-
-      size += 100; // optimistic reservation
-
-      return size;
-    }
-
-    public Schema getSchema() {
-      return schema;
-    }
-  }
-
-  public static void convert(Tuple tuple, RowWriter writer) {
-    writer.startRow();
-
-    for (int i = 0; i < writer.dataTypes().length; i++) {
-      if (tuple.isNull(i)) {
-        writer.skipField();
-        continue;
-      }
-      switch (writer.dataTypes()[i].getType()) {
-      case BOOLEAN:
-        writer.putBool(tuple.getBool(i));
-        break;
-      case INT1:
-      case INT2:
-        writer.putInt2(tuple.getInt2(i));
-        break;
-      case INT4:
-      case DATE:
-      case INET4:
-        writer.putInt4(tuple.getInt4(i));
-        break;
-      case INT8:
-      case TIMESTAMP:
-      case TIME:
-        writer.putInt8(tuple.getInt8(i));
-        break;
-      case FLOAT4:
-        writer.putFloat4(tuple.getFloat4(i));
-        break;
-      case FLOAT8:
-        writer.putFloat8(tuple.getFloat8(i));
-        break;
-      case TEXT:
-        writer.putText(tuple.getBytes(i));
-        break;
-      case INTERVAL:
-        writer.putInterval((IntervalDatum) tuple.getInterval(i));
-        break;
-      case PROTOBUF:
-        writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
-        break;
-      case NULL_TYPE:
-        writer.skipField();
-        break;
-      default:
-        throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]);
-      }
-    }
-    writer.endRow();
-  }
-}