You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:45 UTC

[08/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
new file mode 100644
index 0000000..66c610a
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
+ */
+
+public class LineReader implements Closeable {
+  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private int bufferSize = DEFAULT_BUFFER_SIZE;
+  private InputStream in;
+  private byte[] buffer;
+  // the number of bytes of real data in the buffer
+  private int bufferLength = 0;
+  // the current position in the buffer
+  private int bufferPosn = 0;
+
+  private static final byte CR = '\r';
+  private static final byte LF = '\n';
+
+  // The line delimiter
+  private final byte[] recordDelimiterBytes;
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size (64k).
+   *
+   * @param in The input stream
+   * @throws IOException
+   */
+  public LineReader(InputStream in) {
+    this(in, DEFAULT_BUFFER_SIZE);
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size.
+   *
+   * @param in         The input stream
+   * @param bufferSize Size of the read buffer
+   * @throws IOException
+   */
+  public LineReader(InputStream in, int bufferSize) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = null;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>.
+   *
+   * @param in   input stream
+   * @param conf configuration
+   * @throws IOException
+   */
+  public LineReader(InputStream in, Configuration conf) throws IOException {
+    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   The input stream
+   * @param recordDelimiterBytes The delimiter
+   */
+  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = DEFAULT_BUFFER_SIZE;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   The input stream
+   * @param bufferSize           Size of the read buffer
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, int bufferSize,
+                    byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   input stream
+   * @param conf                 configuration
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, Configuration conf,
+                    byte[] recordDelimiterBytes) throws IOException {
+    this.in = in;
+    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+
+  /**
+   * Close the underlying stream.
+   *
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    in.close();
+  }
+
+  public void reset() {
+    bufferLength = 0;
+    bufferPosn = 0;
+
+  }
+
+  /**
+   * Read one line from the InputStream into the given Text.
+   *
+   * @param str               the object to store the given line (without newline)
+   * @param maxLineLength     the maximum number of bytes to store into str;
+   *                          the rest of the line is silently discarded.
+   * @param maxBytesToConsume the maximum number of bytes to consume
+   *                          in this call.  This is only a hint, because if the line cross
+   *                          this threshold, we allow it to happen.  It can overshoot
+   *                          potentially by as much as one buffer length.
+   * @return the number of bytes read including the (longest) newline
+   *         found.
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength,
+                      int maxBytesToConsume) throws IOException {
+    if (this.recordDelimiterBytes != null) {
+      return readCustomLine(str, maxLineLength, maxBytesToConsume);
+    } else {
+      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+    }
+  }
+
+  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+      throws IOException {
+    return in.read(buffer);
+  }
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    /* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     */
+    str.clear();
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = fillBuffer(in, buffer, prevCharCR);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);
+    }
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+      , int maxBytesToConsume)
+      throws IOException {
+    /* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     */
+
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = fillBuffer(in, buffer, prevCharCR);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.write(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);
+    }
+
+    if (bytesConsumed > 0) offsets.add(txtLength);
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+
+/*  int validIdx = 0;
+  public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
+                             long pos, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    *//* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     *//*
+    //str.clear();
+    str.reset();
+    offsets.clear();
+    foffsets.clear();
+
+    validIdx = 0;
+    long bufferBytesConsumed = 0;
+
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+
+      if (appendLength > 0) {
+        str.write(buffer, startPosn, appendLength);
+        //System.out.println(startPosn + "," + appendLength);
+        //str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+
+      if(newlineLength > 0){
+        validIdx++;
+
+        if (bytesConsumed > (long)Integer.MAX_VALUE) {
+          throw new IOException("Too many bytes before newline: " + bytesConsumed);
+        }
+        offsets.add(txtLength);
+        foffsets.add(pos);
+        pos+= bytesConsumed;
+        bufferBytesConsumed += bytesConsumed;
+
+        txtLength = 0;
+        newlineLength = 0;
+        prevCharCR = false; //true of prev char was CR
+        bytesConsumed = 0;
+      } else {
+        bufferBytesConsumed += bytesConsumed;
+        bytesConsumed = 0;
+      }
+    } while ((bufferBytesConsumed < 256 * 1024));
+
+    return (int)bufferBytesConsumed;
+  }*/
+
+  /**
+   * Read a line terminated by a custom delimiter.
+   */
+  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+   /* We're reading data from inputStream, but the head of the stream may be
+    *  already captured in the previous buffer, so we have several cases:
+    *
+    * 1. The buffer tail does not contain any character sequence which
+    *    matches with the head of delimiter. We count it as a
+    *    ambiguous byte count = 0
+    *
+    * 2. The buffer tail contains a X number of characters,
+    *    that forms a sequence, which matches with the
+    *    head of delimiter. We count ambiguous byte count = X
+    *
+    *    // ***  eg: A segment of input file is as follows
+    *
+    *    " record 1792: I found this bug very interesting and
+    *     I have completely read about it. record 1793: This bug
+    *     can be solved easily record 1794: This ."
+    *
+    *    delimiter = "record";
+    *
+    *    supposing:- String at the end of buffer =
+    *    "I found this bug very interesting and I have completely re"
+    *    There for next buffer = "ad about it. record 179       ...."
+    *
+    *     The matching characters in the input
+    *     buffer tail and delimiter head = "re"
+    *     Therefore, ambiguous byte count = 2 ****   //
+    *
+    *     2.1 If the following bytes are the remaining characters of
+    *         the delimiter, then we have to capture only up to the starting
+    *         position of delimiter. That means, we need not include the
+    *         ambiguous characters in str.
+    *
+    *     2.2 If the following bytes are not the remaining characters of
+    *         the delimiter ( as mentioned in the example ),
+    *         then we have to include the ambiguous characters in str.
+    */
+    str.clear();
+    int txtLength = 0; // tracks str.getLength(), as an optimization
+    long bytesConsumed = 0;
+    int delPosn = 0;
+    int ambiguousByteCount = 0; // To capture the ambiguous characters count
+    do {
+      int startPosn = bufferPosn; // Start from previous end position
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
+        if (bufferLength <= 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) {
+        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+          delPosn++;
+          if (delPosn >= recordDelimiterBytes.length) {
+            bufferPosn++;
+            break;
+          }
+        } else if (delPosn != 0) {
+          bufferPosn--;
+          delPosn = 0;
+        }
+      }
+      int readLength = bufferPosn - startPosn;
+      bytesConsumed += readLength;
+      int appendLength = readLength - delPosn;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        if (ambiguousByteCount > 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          //appending the ambiguous characters (refer case 2.2)
+          bytesConsumed += ambiguousByteCount;
+          ambiguousByteCount = 0;
+        }
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+      if (bufferPosn >= bufferLength) {
+        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+          ambiguousByteCount = delPosn;
+          bytesConsumed -= ambiguousByteCount; //to be consumed in next
+        }
+      }
+    } while (delPosn < recordDelimiterBytes.length
+        && bytesConsumed < maxBytesToConsume);
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+    }
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read from the InputStream into the given Text.
+   *
+   * @param str           the object to store the given line
+   * @param maxLineLength the maximum number of bytes to store into str.
+   * @return the number of bytes read including the newline
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength) throws IOException {
+    return readLine(str, maxLineLength, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Read from the InputStream into the given Text.
+   *
+   * @param str the object to store the given line
+   * @return the number of bytes read including the newline
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str) throws IOException {
+    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
new file mode 100644
index 0000000..e4439f3
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeScanner implements Scanner {
+  private Configuration conf;
+  private TableMeta meta;
+  private Schema schema;
+  private List<FileFragment> fragments;
+  private Iterator<FileFragment> iterator;
+  private FileFragment currentFragment;
+  private Scanner currentScanner;
+  private Tuple tuple;
+  private boolean projectable = false;
+  private boolean selectable = false;
+  private Schema target;
+
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList)
+      throws IOException {
+    this(conf, schema, meta, rawFragmentList, schema);
+  }
+
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList,
+                      Schema target)
+      throws IOException {
+    this.conf = conf;
+    this.schema = schema;
+    this.meta = meta;
+    this.fragments = Lists.newArrayList();
+    for (Fragment f : rawFragmentList) {
+      fragments.add((FileFragment) f);
+    }
+    Collections.sort(fragments);
+
+    this.target = target;
+    this.reset();
+    if (currentScanner != null) {
+      this.projectable = currentScanner.isProjectable();
+      this.selectable = currentScanner.isSelectable();
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (currentScanner != null)
+      tuple = currentScanner.next();
+
+    if (tuple != null) {
+      return tuple;
+    } else {
+      if (currentScanner != null) {
+        currentScanner.close();
+      }
+      currentScanner = getNextScanner();
+      if (currentScanner != null) {
+        tuple = currentScanner.next();
+      }
+    }
+    return tuple;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    this.iterator = fragments.iterator();
+    this.currentScanner = getNextScanner();
+  }
+
+  private Scanner getNextScanner() throws IOException {
+    if (iterator.hasNext()) {
+      currentFragment = iterator.next();
+      currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, schema,
+          currentFragment, target);
+      currentScanner.init();
+      return currentScanner;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(currentScanner != null) {
+      currentScanner.close();
+    }
+    iterator = null;
+    if(fragments != null) {
+      fragments.clear();
+    }
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return projectable;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+    this.target = new Schema(targets);
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return selectable;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public boolean isSplittable(){
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
new file mode 100644
index 0000000..94d13ee
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Comparator;
+
+public class NumericPathComparator implements Comparator<Path> {
+
+  @Override
+  public int compare(Path p1, Path p2) {
+    int num1 = Integer.parseInt(p1.getName());
+    int num2 = Integer.parseInt(p2.getName());
+
+    return num1 - num2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
new file mode 100644
index 0000000..db511dc
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.datum.TimestampDatum;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.BitArray;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+public class RawFile {
+  private static final Log LOG = LogFactory.getLog(RawFile.class);
+
+  public static class RawFileScanner extends FileScanner implements SeekableScanner {
+    private FileChannel channel;
+    private DataType[] columnTypes;
+    private Path path;
+
+    private ByteBuffer buffer;
+    private Tuple tuple;
+
+    private int headerSize = 0;
+    private BitArray nullFlags;
+    private static final int RECORD_SIZE = 4;
+    private boolean eof = false;
+    private long fileSize;
+    private FileInputStream fis;
+
+    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+      super(conf, schema, meta, null);
+      this.path = path;
+      init();
+    }
+
+    @SuppressWarnings("unused")
+    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+      this(conf, schema, meta, fragment.getPath());
+    }
+
+    public void init() throws IOException {
+      //Preconditions.checkArgument(FileUtil.isLocalPath(path));
+      // TODO - to make it unified one.
+      URI uri = path.toUri();
+      fis = new FileInputStream(new File(uri));
+      channel = fis.getChannel();
+      fileSize = channel.size();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
+      }
+
+      buffer = ByteBuffer.allocateDirect(128 * 1024);
+
+      columnTypes = new DataType[schema.getColumnNum()];
+      for (int i = 0; i < schema.getColumnNum(); i++) {
+        columnTypes[i] = schema.getColumn(i).getDataType();
+      }
+
+      tuple = new VTuple(columnTypes.length);
+
+      // initial read
+      channel.read(buffer);
+      buffer.flip();
+
+      nullFlags = new BitArray(schema.getColumnNum());
+      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
+
+      super.init();
+    }
+
+    @Override
+    public long getNextOffset() throws IOException {
+      return channel.position() - buffer.remaining();
+    }
+
+    @Override
+    public void seek(long offset) throws IOException {
+      long currentPos = channel.position();
+      if(currentPos < offset &&  offset < currentPos + buffer.limit()){
+        buffer.position((int)(offset - currentPos));
+      } else {
+        buffer.clear();
+        channel.position(offset);
+        channel.read(buffer);
+        buffer.flip();
+        eof = false;
+      }
+    }
+
+    private boolean fillBuffer() throws IOException {
+      buffer.compact();
+      if (channel.read(buffer) == -1) {
+        eof = true;
+        return false;
+      } else {
+        buffer.flip();
+        return true;
+      }
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      if(eof) return null;
+
+      if (buffer.remaining() < headerSize) {
+        if (!fillBuffer()) {
+          return null;
+        }
+      }
+
+      // backup the buffer state
+      int bufferLimit = buffer.limit();
+      int recordSize = buffer.getInt();
+      int nullFlagSize = buffer.getShort();
+
+      buffer.limit(buffer.position() + nullFlagSize);
+      nullFlags.fromByteBuffer(buffer);
+      // restore the start of record contents
+      buffer.limit(bufferLimit);
+      //buffer.position(recordOffset + headerSize);
+      if (buffer.remaining() < (recordSize - headerSize)) {
+        if (!fillBuffer()) {
+          return null;
+        }
+      }
+
+      for (int i = 0; i < columnTypes.length; i++) {
+        // check if the i'th column is null
+        if (nullFlags.get(i)) {
+          tuple.put(i, DatumFactory.createNullDatum());
+          continue;
+        }
+
+        switch (columnTypes[i].getType()) {
+          case BOOLEAN :
+            tuple.put(i, DatumFactory.createBool(buffer.get()));
+            break;
+
+          case BIT :
+            tuple.put(i, DatumFactory.createBit(buffer.get()));
+            break;
+
+          case CHAR :
+            int realLen = buffer.getInt();
+            byte[] buf = new byte[columnTypes[i].getLength()];
+            buffer.get(buf);
+            byte[] charBuf = Arrays.copyOf(buf, realLen);
+            tuple.put(i, DatumFactory.createChar(charBuf));
+            break;
+
+          case INT2 :
+            tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
+            break;
+
+          case INT4 :
+            tuple.put(i, DatumFactory.createInt4(buffer.getInt()));
+            break;
+
+          case INT8 :
+            tuple.put(i, DatumFactory.createInt8(buffer.getLong()));
+            break;
+
+          case FLOAT4 :
+            tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
+            break;
+
+          case FLOAT8 :
+            tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
+            break;
+
+          case TEXT :
+            // TODO - shoud use CharsetEncoder / CharsetDecoder
+            //byte [] rawBytes = getColumnBytes();
+            int strSize2 = buffer.getInt();
+            byte [] strBytes2 = new byte[strSize2];
+            buffer.get(strBytes2);
+            tuple.put(i, DatumFactory.createText(new String(strBytes2)));
+            break;
+
+          case TIMESTAMP:
+            tuple.put(i, DatumFactory.createTimeStampFromMillis(buffer.getLong()));
+            break;
+
+          case BLOB : {
+            //byte [] rawBytes = getColumnBytes();
+            int byteSize = buffer.getInt();
+            byte [] rawBytes = new byte[byteSize];
+            buffer.get(rawBytes);
+            tuple.put(i, DatumFactory.createBlob(rawBytes));
+            break;
+          }
+
+          case PROTOBUF: {
+            //byte [] rawBytes = getColumnBytes();
+            int byteSize = buffer.getInt();
+            byte [] rawBytes = new byte[byteSize];
+            buffer.get(rawBytes);
+
+            ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
+            Message.Builder builder = factory.newBuilder();
+            builder.mergeFrom(rawBytes);
+            tuple.put(i, factory.createDatum(builder.build()));
+            break;
+          }
+
+          case INET4 :
+            byte [] ipv4Bytes = new byte[4];
+            buffer.get(ipv4Bytes);
+            tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
+            break;
+
+          case NULL_TYPE:
+            tuple.put(i, NullDatum.get());
+            break;
+
+          default:
+        }
+      }
+
+      if(!buffer.hasRemaining() && channel.position() == fileSize){
+        eof = true;
+      }
+      return tuple;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      // clear the buffer
+      buffer.clear();
+      // reload initial buffer
+      channel.position(0);
+      channel.read(buffer);
+      buffer.flip();
+      eof = false;
+    }
+
+    @Override
+    public void close() throws IOException {
+      buffer.clear();
+      channel.close();
+      fis.close();
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSplittable(){
+      return false;
+    }
+  }
+
+  public static class RawFileAppender extends FileAppender {
+    private FileChannel channel;
+    private RandomAccessFile randomAccessFile;
+    private DataType[] columnTypes;
+
+    private ByteBuffer buffer;
+    private BitArray nullFlags;
+    private int headerSize = 0;
+    private static final int RECORD_SIZE = 4;
+    private long pos;
+
+    private TableStatistics stats;
+
+    public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+      super(conf, schema, meta, path);
+    }
+
+    public void init() throws IOException {
+      // TODO - RawFile only works on Local File System.
+      //Preconditions.checkArgument(FileUtil.isLocalPath(path));
+      File file = new File(path.toUri());
+      randomAccessFile = new RandomAccessFile(file, "rw");
+      channel = randomAccessFile.getChannel();
+      pos = 0;
+
+      columnTypes = new DataType[schema.getColumnNum()];
+      for (int i = 0; i < schema.getColumnNum(); i++) {
+        columnTypes[i] = schema.getColumn(i).getDataType();
+      }
+
+      buffer = ByteBuffer.allocateDirect(64 * 1024);
+
+      // comput the number of bytes, representing the null flags
+
+      nullFlags = new BitArray(schema.getColumnNum());
+      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
+
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+
+      super.init();
+    }
+
+    @Override
+    public long getOffset() throws IOException {
+      return pos;
+    }
+
+    private void flushBuffer() throws IOException {
+      buffer.limit(buffer.position());
+      buffer.flip();
+      channel.write(buffer);
+      buffer.clear();
+    }
+
+    private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
+        throws IOException {
+
+      // if the buffer reaches the limit,
+      // write the bytes from 0 to the previous record.
+      if (buffer.remaining() < sizeToBeWritten) {
+
+        int limit = buffer.position();
+        buffer.limit(recordOffset);
+        buffer.flip();
+        channel.write(buffer);
+        buffer.position(recordOffset);
+        buffer.limit(limit);
+        buffer.compact();
+
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public void addTuple(Tuple t) throws IOException {
+
+      if (buffer.remaining() < headerSize) {
+        flushBuffer();
+      }
+
+      // skip the row header
+      int recordOffset = buffer.position();
+      buffer.position(recordOffset + headerSize);
+      // reset the null flags
+      nullFlags.clear();
+      for (int i = 0; i < schema.getColumnNum(); i++) {
+        if (enabledStats) {
+          stats.analyzeField(i, t.get(i));
+        }
+
+        if (t.isNull(i)) {
+          nullFlags.set(i);
+          continue;
+        }
+
+        // 8 is the maximum bytes size of all types
+        if (flushBufferAndReplace(recordOffset, 8)) {
+          recordOffset = 0;
+        }
+
+        switch(columnTypes[i].getType()) {
+          case NULL_TYPE:
+            nullFlags.set(i);
+            continue;
+
+          case BOOLEAN:
+          case BIT:
+            buffer.put(t.get(i).asByte());
+            break;
+
+          case CHAR :
+            byte[] src = t.getChar(i).asByteArray();
+            byte[] dst = Arrays.copyOf(src, columnTypes[i].getLength());
+            buffer.putInt(src.length);
+            buffer.put(dst);
+            break;
+
+          case INT2 :
+            buffer.putShort(t.get(i).asInt2());
+            break;
+
+          case INT4 :
+            buffer.putInt(t.get(i).asInt4());
+            break;
+
+          case INT8 :
+            buffer.putLong(t.get(i).asInt8());
+            break;
+
+          case FLOAT4 :
+            buffer.putFloat(t.get(i).asFloat4());
+            break;
+
+          case FLOAT8 :
+            buffer.putDouble(t.get(i).asFloat8());
+            break;
+
+          case TEXT:
+            byte [] strBytes2 = t.get(i).asByteArray();
+            if (flushBufferAndReplace(recordOffset, strBytes2.length + 4)) {
+              recordOffset = 0;
+            }
+            buffer.putInt(strBytes2.length);
+            buffer.put(strBytes2);
+            break;
+
+          case TIMESTAMP:
+            buffer.putLong(((TimestampDatum)t.get(i)).getMillis());
+            break;
+
+          case BLOB : {
+            byte [] rawBytes = t.get(i).asByteArray();
+            if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
+              recordOffset = 0;
+            }
+            buffer.putInt(rawBytes.length);
+            buffer.put(rawBytes);
+            break;
+          }
+
+          case PROTOBUF: {
+            // TODO - to be fixed
+//            byte [] lengthByte = new byte[4];
+//            byte [] byteArray = t.get(i).asByteArray();
+//            CodedOutputStream outputStream = CodedOutputStream.newInstance(lengthByte);
+//            outputStream.writeUInt32NoTag(byteArray.length);
+//            outputStream.flush();
+//            int legnthByteLength = CodedOutputStream.computeInt32SizeNoTag(byteArray.length);
+//            if (flushBufferAndReplace(recordOffset, byteArray.length + legnthByteLength)) {
+//              recordOffset = 0;
+//            }
+//            buffer.put(lengthByte, 0, legnthByteLength);
+            byte [] rawBytes = t.get(i).asByteArray();
+            if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
+              recordOffset = 0;
+            }
+            buffer.putInt(rawBytes.length);
+            buffer.put(rawBytes);
+            break;
+          }
+
+          case INET4 :
+            buffer.put(t.get(i).asByteArray());
+            break;
+
+          default:
+            throw new IOException("Cannot support data type: " + columnTypes[i].getType());
+        }
+      }
+
+      // write a record header
+      int bufferPos = buffer.position();
+      buffer.position(recordOffset);
+      buffer.putInt(bufferPos - recordOffset);
+      byte [] flags = nullFlags.toArray();
+      buffer.putShort((short) flags.length);
+      buffer.put(flags);
+
+      pos += bufferPos - recordOffset;
+      buffer.position(bufferPos);
+
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
+    @Override
+    public void flush() throws IOException {
+      flushBuffer();
+    }
+
+    @Override
+    public void close() throws IOException {
+      flush();
+      if (enabledStats) {
+        stats.setNumBytes(getOffset());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+      }
+      channel.close();
+      randomAccessFile.close();
+    }
+
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
new file mode 100644
index 0000000..1e89f31
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.BitArray;
+import org.apache.tajo.util.Bytes;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+
+public class RowFile {
+  public static final Log LOG = LogFactory.getLog(RowFile.class);
+
+  private static final int SYNC_ESCAPE = -1;
+  private static final int SYNC_HASH_SIZE = 16;
+  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
+  private final static int DEFAULT_BUFFER_SIZE = 65535;
+  public static int SYNC_INTERVAL;
+
+  public static class RowFileScanner extends FileScanner {
+    private FileSystem fs;
+    private FSDataInputStream in;
+    private Tuple tuple;
+
+    private byte[] sync = new byte[SYNC_HASH_SIZE];
+    private byte[] checkSync = new byte[SYNC_HASH_SIZE];
+    private long start, end;
+
+    private ByteBuffer buffer;
+    private final int tupleHeaderSize;
+    private BitArray nullFlags;
+    private long bufferStartPos;
+
+    public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
+        throws IOException {
+      super(conf, schema, meta, fragment);
+
+      SYNC_INTERVAL =
+          conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname,
+              SYNC_SIZE * 100);
+
+      nullFlags = new BitArray(schema.getColumnNum());
+      tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
+      this.start = fragment.getStartKey();
+      this.end = this.start + fragment.getEndKey();
+    }
+
+    public void init() throws IOException {
+      // set default page size.
+      fs = fragment.getPath().getFileSystem(conf);
+      in = fs.open(fragment.getPath());
+      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.getColumnNum());
+      buffer.flip();
+
+      readHeader();
+
+      // find the correct position from the start
+      if (this.start > in.getPos()) {
+        long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
+        in.seek(realStart);
+      }
+      bufferStartPos = in.getPos();
+      fillBuffer();
+
+      if (start != 0) {
+        // TODO: improve
+        boolean syncFound = false;
+        while (!syncFound) {
+          if (buffer.remaining() < SYNC_SIZE) {
+            fillBuffer();
+          }
+          buffer.mark();
+          syncFound = checkSync();
+          if (!syncFound) {
+            buffer.reset();
+            buffer.get(); // proceed one byte
+          }
+        }
+        bufferStartPos += buffer.position();
+        buffer.compact();
+        buffer.flip();
+      }
+
+      super.init();
+    }
+
+    private void readHeader() throws IOException {
+      SYNC_INTERVAL = in.readInt();
+      Bytes.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
+    }
+
+    /**
+     * Find the sync from the front of the buffer
+     *
+     * @return return true if it succeeds to find the sync.
+     * @throws IOException
+     */
+    private boolean checkSync() throws IOException {
+      buffer.getInt();                           // escape
+      buffer.get(checkSync, 0, SYNC_HASH_SIZE);  // sync
+      return Arrays.equals(checkSync, sync);
+    }
+
+    private int fillBuffer() throws IOException {
+      bufferStartPos += buffer.position();
+      buffer.compact();
+      int remain = buffer.remaining();
+      int read = in.read(buffer);
+      if (read == -1) {
+        buffer.flip();
+        return read;
+      } else {
+        int totalRead = read;
+        if (remain > totalRead) {
+          read = in.read(buffer);
+          totalRead += read > 0 ? read : 0;
+        }
+        buffer.flip();
+        return totalRead;
+      }
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      while (buffer.remaining() < SYNC_SIZE) {
+        if (fillBuffer() < 0) {
+          return null;
+        }
+      }
+
+      buffer.mark();
+      if (!checkSync()) {
+        buffer.reset();
+      } else {
+        if (bufferStartPos + buffer.position() > end) {
+          return null;
+        }
+      }
+
+      while (buffer.remaining() < tupleHeaderSize) {
+        if (fillBuffer() < 0) {
+          return null;
+        }
+      }
+
+      int i;
+      tuple = new VTuple(schema.getColumnNum());
+
+      int nullFlagSize = buffer.getShort();
+      byte[] nullFlagBytes = new byte[nullFlagSize];
+      buffer.get(nullFlagBytes, 0, nullFlagSize);
+      nullFlags = new BitArray(nullFlagBytes);
+      int tupleSize = buffer.getShort();
+
+      while (buffer.remaining() < (tupleSize)) {
+        if (fillBuffer() < 0) {
+          return null;
+        }
+      }
+
+      Datum datum;
+      Column col;
+      for (i = 0; i < schema.getColumnNum(); i++) {
+        if (!nullFlags.get(i)) {
+          col = schema.getColumn(i);
+          switch (col.getDataType().getType()) {
+            case BOOLEAN :
+              datum = DatumFactory.createBool(buffer.get());
+              tuple.put(i, datum);
+              break;
+
+            case BIT:
+              datum = DatumFactory.createBit(buffer.get());
+              tuple.put(i, datum );
+              break;
+
+            case CHAR :
+              int realLen = buffer.getInt();
+              byte[] buf = new byte[col.getDataType().getLength()];
+              buffer.get(buf);
+              byte[] charBuf = Arrays.copyOf(buf, realLen);
+              tuple.put(i, DatumFactory.createChar(charBuf));
+              break;
+
+            case INT2 :
+              datum = DatumFactory.createInt2(buffer.getShort());
+              tuple.put(i, datum );
+              break;
+
+            case INT4 :
+              datum = DatumFactory.createInt4(buffer.getInt());
+              tuple.put(i, datum );
+              break;
+
+            case INT8 :
+              datum = DatumFactory.createInt8(buffer.getLong());
+              tuple.put(i, datum );
+              break;
+
+            case FLOAT4 :
+              datum = DatumFactory.createFloat4(buffer.getFloat());
+              tuple.put(i, datum);
+              break;
+
+            case FLOAT8 :
+              datum = DatumFactory.createFloat8(buffer.getDouble());
+              tuple.put(i, datum);
+              break;
+
+//            case TEXT :
+//              short len = buffer.getShort();
+//              byte[] buf = new byte[len];
+//              buffer.get(buf, 0, len);
+//              datum = DatumFactory.createText(buf);
+//              tuple.put(i, datum);
+//              break;
+
+            case TEXT:
+              short bytelen = buffer.getShort();
+              byte[] strbytes = new byte[bytelen];
+              buffer.get(strbytes, 0, bytelen);
+              datum = DatumFactory.createText(strbytes);
+              tuple.put(i, datum);
+              break;
+
+            case BLOB:
+              short bytesLen = buffer.getShort();
+              byte [] bytesBuf = new byte[bytesLen];
+              buffer.get(bytesBuf);
+              datum = DatumFactory.createBlob(bytesBuf);
+              tuple.put(i, datum);
+              break;
+
+            case INET4 :
+              byte[] ipv4 = new byte[4];
+              buffer.get(ipv4, 0, 4);
+              datum = DatumFactory.createInet4(ipv4);
+              tuple.put(i, datum);
+              break;
+
+            default:
+              break;
+          }
+        } else {
+          tuple.put(i, DatumFactory.createNullDatum());
+        }
+      }
+      return tuple;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (in != null) {
+        in.close();
+      }
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSplittable(){
+      return true;
+    }
+  }
+
+  public static class RowFileAppender extends FileAppender {
+    private FSDataOutputStream out;
+    private long lastSyncPos;
+    private FileSystem fs;
+    private byte[] sync;
+    private ByteBuffer buffer;
+
+    private BitArray nullFlags;
+    // statistics
+    private TableStatistics stats;
+
+    public RowFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
+        throws IOException {
+      super(conf, schema, meta, path);
+    }
+
+    public void init() throws IOException {
+      SYNC_INTERVAL = conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, 100);
+
+      fs = path.getFileSystem(conf);
+
+      if (!fs.exists(path.getParent())) {
+        throw new FileNotFoundException(path.toString());
+      }
+
+      if (fs.exists(path)) {
+        throw new AlreadyExistsStorageException(path);
+      }
+
+      sync = new byte[SYNC_HASH_SIZE];
+      lastSyncPos = 0;
+
+      out = fs.create(path);
+
+      MessageDigest md;
+      try {
+        md = MessageDigest.getInstance("MD5");
+        md.update((path.toString()+System.currentTimeMillis()).getBytes());
+        sync = md.digest();
+      } catch (NoSuchAlgorithmException e) {
+        LOG.error(e);
+      }
+
+      writeHeader();
+
+      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+
+      nullFlags = new BitArray(schema.getColumnNum());
+
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+    }
+
+    private void writeHeader() throws IOException {
+      out.writeInt(SYNC_INTERVAL);
+      out.write(sync);
+      out.flush();
+      lastSyncPos = out.getPos();
+    }
+
+    @Override
+    public void addTuple(Tuple t) throws IOException {
+      checkAndWriteSync();
+      Column col;
+
+      buffer.clear();
+      nullFlags.clear();
+
+      for (int i = 0; i < schema.getColumnNum(); i++) {
+        if (enabledStats) {
+          stats.analyzeField(i, t.get(i));
+        }
+
+        if (t.isNull(i)) {
+          nullFlags.set(i);
+        } else {
+          col = schema.getColumn(i);
+          switch (col.getDataType().getType()) {
+            case BOOLEAN:
+              buffer.put(t.getBoolean(i).asByte());
+              break;
+            case BIT:
+              buffer.put(t.getByte(i).asByte());
+              break;
+            case CHAR:
+              byte[] src = t.getChar(i).asByteArray();
+              byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
+              buffer.putInt(src.length);
+              buffer.put(dst);
+              break;
+            case TEXT:
+              byte [] strbytes = t.getText(i).asByteArray();
+              buffer.putShort((short)strbytes.length);
+              buffer.put(strbytes, 0, strbytes.length);
+              break;
+            case INT2:
+              buffer.putShort(t.getShort(i).asInt2());
+              break;
+            case INT4:
+              buffer.putInt(t.getInt(i).asInt4());
+              break;
+            case INT8:
+              buffer.putLong(t.getLong(i).asInt8());
+              break;
+            case FLOAT4:
+              buffer.putFloat(t.getFloat(i).asFloat4());
+              break;
+            case FLOAT8:
+              buffer.putDouble(t.getDouble(i).asFloat8());
+              break;
+            case BLOB:
+              byte [] bytes = t.getBytes(i).asByteArray();
+              buffer.putShort((short)bytes.length);
+              buffer.put(bytes);
+              break;
+            case INET4:
+              buffer.put(t.getIPv4Bytes(i));
+              break;
+            case INET6:
+              buffer.put(t.getIPv6Bytes(i));
+              break;
+            case NULL_TYPE:
+              nullFlags.set(i);
+              break;
+            default:
+              break;
+          }
+        }
+      }
+
+      byte[] bytes = nullFlags.toArray();
+      out.writeShort(bytes.length);
+      out.write(bytes);
+
+      bytes = buffer.array();
+      int dataLen = buffer.position();
+      out.writeShort(dataLen);
+      out.write(bytes, 0, dataLen);
+
+      // Statistical section
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
+    @Override
+    public long getOffset() throws IOException {
+      return out.getPos();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (out != null) {
+        if (enabledStats) {
+          stats.setNumBytes(out.getPos());
+        }
+        sync();
+        out.flush();
+        out.close();
+      }
+    }
+
+    private void sync() throws IOException {
+      if (lastSyncPos != out.getPos()) {
+        out.writeInt(SYNC_ESCAPE);
+        out.write(sync);
+        lastSyncPos = out.getPos();
+      }
+    }
+
+    private void checkAndWriteSync() throws IOException {
+      if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+        sync();
+      }
+    }
+
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
new file mode 100644
index 0000000..9f32028
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.TimestampDatum;
+import org.apache.tajo.util.Bytes;
+
+import java.nio.ByteBuffer;
+
+public class RowStoreUtil {
+  public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
+    int[] targetIds = new int[outSchema.getColumnNum()];
+    int i = 0;
+    for (Column target : outSchema.getColumns()) {
+      targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
+      i++;
+    }
+
+    return targetIds;
+  }
+
+  public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
+    out.clear();
+    for (int idx = 0; idx < targetIds.length; idx++) {
+      out.put(idx, in.get(targetIds[idx]));
+    }
+    return out;
+  }
+
+  public static class RowStoreDecoder {
+
+    public static Tuple toTuple(Schema schema, byte [] bytes) {
+      ByteBuffer bb = ByteBuffer.wrap(bytes);
+      Tuple tuple = new VTuple(schema.getColumnNum());
+      Column col;
+      TajoDataTypes.DataType type;
+      for (int i =0; i < schema.getColumnNum(); i++) {
+        col = schema.getColumn(i);
+        type = col.getDataType();
+        switch (type.getType()) {
+          case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
+          case BIT:
+            byte b = bb.get();
+            if(b == 0) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            } else {
+              tuple.put(i, DatumFactory.createBit(b));
+            }
+            break;
+
+          case CHAR:
+            byte c = bb.get();
+            if(c == 0) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            } else {
+              tuple.put(i, DatumFactory.createChar(c));
+            }
+            break;
+
+          case INT2:
+            short s = bb.getShort();
+            if(s < Short.MIN_VALUE + 1) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            }else {
+              tuple.put(i, DatumFactory.createInt2(s));
+            }
+            break;
+
+          case INT4:
+          case DATE:
+            int i_ = bb.getInt();
+            if ( i_ < Integer.MIN_VALUE + 1) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            } else {
+              tuple.put(i, DatumFactory.createFromInt4(type, i_));
+            }
+            break;
+
+          case INT8:
+          case TIME:
+          case TIMESTAMP:
+            long l = bb.getLong();
+            if ( l < Long.MIN_VALUE + 1) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            }else {
+              tuple.put(i, DatumFactory.createFromInt8(type, l));
+            }
+            break;
+
+          case FLOAT4:
+            float f = bb.getFloat();
+            if (Float.isNaN(f)) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            }else {
+              tuple.put(i, DatumFactory.createFloat4(f));
+            }
+            break;
+
+          case FLOAT8:
+            double d = bb.getDouble();
+            if(Double.isNaN(d)) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            }else {
+              tuple.put(i, DatumFactory.createFloat8(d));
+            }
+            break;
+
+          case TEXT:
+            byte [] _string = new byte[bb.getInt()];
+            bb.get(_string);
+            String str = new String(_string);
+            if(str.compareTo("NULL") == 0) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            }else {
+            tuple.put(i, DatumFactory.createText(str));
+            }
+            break;
+
+          case BLOB:
+            byte [] _bytes = new byte[bb.getInt()];
+            bb.get(_bytes);
+            if(Bytes.compareTo(bytes, Bytes.toBytes("NULL")) == 0) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            } else {
+              tuple.put(i, DatumFactory.createBlob(_bytes));
+            }
+            break;
+
+          case INET4:
+            byte [] _ipv4 = new byte[4];
+            bb.get(_ipv4);
+            tuple.put(i, DatumFactory.createInet4(_ipv4));
+            break;
+          case INET6:
+            // TODO - to be implemented
+        }
+      }
+      return tuple;
+    }
+  }
+
+  public static class RowStoreEncoder {
+
+    public static byte [] toBytes(Schema schema, Tuple tuple) {
+      int size = StorageUtil.getRowByteSize(schema);
+      ByteBuffer bb = ByteBuffer.allocate(size);
+      Column col;
+      for (int i = 0; i < schema.getColumnNum(); i++) {
+        col = schema.getColumn(i);
+        switch (col.getDataType().getType()) {
+          case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
+          case BIT: bb.put(tuple.get(i).asByte()); break;
+          case CHAR: bb.put(tuple.get(i).asByte()); break;
+          case INT2: bb.putShort(tuple.get(i).asInt2()); break;
+          case INT4: bb.putInt(tuple.get(i).asInt4()); break;
+          case INT8: bb.putLong(tuple.get(i).asInt8()); break;
+          case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break;
+          case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break;
+          case TEXT:
+            byte [] _string = tuple.get(i).asByteArray();
+            bb.putInt(_string.length);
+            bb.put(_string);
+            break;
+          case DATE: bb.putInt(tuple.get(i).asInt4()); break;
+          case TIMESTAMP: bb.putLong(((TimestampDatum)tuple.get(i)).getMillis()); break;
+          case BLOB:
+            byte [] bytes = tuple.get(i).asByteArray();
+            bb.putInt(bytes.length);
+            bb.put(bytes);
+            break;
+          case INET4:
+            byte [] ipBytes = tuple.getIPv4Bytes(i);
+            bb.put(ipBytes);
+            break;
+          case INET6: bb.put(tuple.getIPv6Bytes(i)); break;
+          default:
+        }
+      }
+
+      bb.flip();
+      byte [] buf = new byte [bb.limit()];
+      bb.get(buf);
+      return buf;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
new file mode 100644
index 0000000..6dca3f2
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaObject;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Scanner Interface
+ */
+public interface Scanner extends SchemaObject, Closeable {
+
+  void init() throws IOException;
+
+  /**
+   * It returns one tuple at each call. 
+   * 
+   * @return retrieve null if the scanner has no more tuples. 
+   * Otherwise it returns one tuple.
+   * 
+   * @throws IOException if internal I/O error occurs during next method
+   */
+  Tuple next() throws IOException;
+  
+  /**
+   * Reset the cursor. After executed, the scanner 
+   * will retrieve the first tuple.
+   * 
+   * @throws IOException if internal I/O error occurs during reset method
+   */
+  void reset() throws IOException;
+  
+  /**
+   * Close scanner
+   * 
+   * @throws IOException if internal I/O error occurs during close method
+   */
+  void close() throws IOException;
+
+
+  /**
+   * It returns if the projection is executed in the underlying scanner layer.
+   *
+   * @return true if this scanner can project the given columns.
+   */
+  boolean isProjectable();
+
+  /**
+   * Set target columns
+   * @param targets columns to be projected
+   */
+  void setTarget(Column [] targets);
+
+  /**
+   * It returns if the selection is executed in the underlying scanner layer.
+   *
+   * @return true if this scanner can filter tuples against a given condition.
+   */
+  boolean isSelectable();
+
+  /**
+   * Set a search condition
+   * @param expr to be searched
+   *
+   * TODO - to be changed Object type
+   */
+  void setSearchCondition(Object expr);
+
+  /**
+   * It returns if the file is splittable.
+   *
+   * @return true if this scanner can split the a file.
+   */
+  boolean isSplittable();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
new file mode 100644
index 0000000..894e7ee
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.io.IOException;
+
+public interface SeekableScanner extends Scanner {
+
+  public abstract long getNextOffset() throws IOException;
+
+  public abstract void seek(long offset) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
new file mode 100644
index 0000000..333f205
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+public interface SerializerDeserializer {
+
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
new file mode 100644
index 0000000..3579674
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SplitLineReader extends LineReader {
+  public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
+    super(in, recordDelimiterBytes);
+  }
+
+  public SplitLineReader(InputStream in, Configuration conf,
+                         byte[] recordDelimiterBytes) throws IOException {
+    super(in, conf, recordDelimiterBytes);
+  }
+
+  public boolean needAdditionalRecordAfterSplit() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
new file mode 100644
index 0000000..cc85c1d
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+
+public abstract class Storage {
+  protected final Configuration conf;
+  
+  public Storage(final Configuration conf) {
+    this.conf = conf;
+  }
+  
+  public Configuration getConf() {
+    return this.conf;
+  }
+  
+  public abstract Appender getAppender(TableMeta meta, Path path)
+    throws IOException;
+
+  public abstract Scanner openScanner(Schema schema, FileFragment[] tablets)
+    throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
new file mode 100644
index 0000000..1b852d4
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+
+/**
+ * StorageManager
+ */
+public class StorageManager extends AbstractStorageManager {
+
+  protected StorageManager(TajoConf conf) throws IOException {
+    super(conf);
+  }
+
+  @Override
+  public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException {
+    String handlerName = storeType.name().toLowerCase();
+    Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
+    if (scannerClass == null) {
+      scannerClass = conf.getClass(
+          String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class);
+      SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+    }
+
+    if (scannerClass == null) {
+      throw new IOException("Unknown Storage Type: " + storeType.name());
+    }
+
+    return scannerClass;
+  }
+
+  @Override
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    Scanner scanner;
+
+    Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
+    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    if (scanner.isProjectable()) {
+      scanner.setTarget(target.toArray());
+    }
+
+    return scanner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
new file mode 100644
index 0000000..85bb861
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+public class StorageManagerFactory {
+  private static final Map<String, AbstractStorageManager> storageManagers = Maps.newHashMap();
+
+  public static AbstractStorageManager getStorageManager(TajoConf conf) throws IOException {
+    return getStorageManager(conf, null);
+  }
+
+  public static synchronized AbstractStorageManager getStorageManager (
+      TajoConf conf, Path warehouseDir) throws IOException {
+    return getStorageManager(conf, warehouseDir, conf.getBoolVar(ConfVars.STORAGE_MANAGER_VERSION_2));
+  }
+
+  private static synchronized AbstractStorageManager getStorageManager (
+      TajoConf conf, Path warehouseDir, boolean v2) throws IOException {
+
+    URI uri;
+    TajoConf localConf = new TajoConf(conf);
+    if (warehouseDir != null) {
+      localConf.setVar(ConfVars.WAREHOUSE_DIR, warehouseDir.toUri().toString());
+    }
+
+    uri = TajoConf.getWarehouseDir(localConf).toUri();
+
+    String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
+
+    if(v2) {
+      key += "_v2";
+    }
+
+    if(storageManagers.containsKey(key)) {
+      AbstractStorageManager sm = storageManagers.get(key);
+      return sm;
+    } else {
+      AbstractStorageManager storageManager;
+
+      if(v2) {
+        storageManager = new StorageManagerV2(localConf);
+      } else {
+        storageManager = new StorageManager(localConf);
+      }
+
+      storageManagers.put(key, storageManager);
+
+      return storageManager;
+    }
+  }
+
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException {
+    return (SeekableScanner)getStorageManager(conf, null, false).getScanner(meta, schema, fragment, target);
+  }
+
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException {
+
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+
+    return getSeekableScanner(conf, meta, schema, fragment, schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
new file mode 100644
index 0000000..9627a5d
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.IOException;
+
+public class StorageUtil {
+  public static int getRowByteSize(Schema schema) {
+    int sum = 0;
+    for(Column col : schema.getColumns()) {
+      sum += StorageUtil.getColByteSize(col);
+    }
+
+    return sum;
+  }
+
+  public static int getColByteSize(Column col) {
+    switch(col.getDataType().getType()) {
+    case BOOLEAN: return 1;
+    case CHAR: return 1;
+    case BIT: return 1;
+    case INT2: return 2;
+    case INT4: return 4;
+    case INT8: return 8;
+    case FLOAT4: return 4;
+    case FLOAT8: return 8;
+    case INET4: return 4;
+    case INET6: return 32;
+    case TEXT: return 256;
+    case BLOB: return 256;
+    default: return 0;
+    }
+  }
+
+  public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException {
+    FileSystem fs = tableroot.getFileSystem(conf);
+    FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
+    FileUtil.writeProto(out, meta.getProto());
+    out.flush();
+    out.close();
+  }
+  
+  public static Path concatPath(String parent, String...childs) {
+    return concatPath(new Path(parent), childs);
+  }
+  
+  public static Path concatPath(Path parent, String...childs) {
+    StringBuilder sb = new StringBuilder();
+    
+    for(int i=0; i < childs.length; i++) {      
+      sb.append(childs[i]);
+      if(i < childs.length - 1)
+        sb.append("/");
+    }
+    
+    return new Path(parent, sb.toString());
+  }
+}