You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:38 UTC

[04/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
new file mode 100644
index 0000000..2f742c6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.ByteBufInputChannel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ByteBufLineReader implements Closeable {
+  private static int DEFAULT_BUFFER = 64 * 1024;
+
+  private int bufferSize;
+  private long readBytes;
+  private int startIndex;
+  private boolean eof = false;
+  private ByteBuf buffer;
+  private final ByteBufInputChannel channel;
+  private final AtomicInteger lineReadBytes = new AtomicInteger();
+  private final LineSplitProcessor processor = new LineSplitProcessor();
+
+  public ByteBufLineReader(ByteBufInputChannel channel) {
+    this(channel, BufferPool.directBuffer(DEFAULT_BUFFER));
+  }
+
+  public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) {
+    this.readBytes = 0;
+    this.channel = channel;
+    this.buffer = buf;
+    this.bufferSize = buf.capacity();
+  }
+
+  public long readBytes() {
+    return readBytes - buffer.readableBytes();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.buffer.refCnt() > 0) {
+      this.buffer.release();
+    }
+    this.channel.close();
+  }
+
+  public String readLine() throws IOException {
+    ByteBuf buf = readLineBuf(lineReadBytes);
+    if (buf != null) {
+      return buf.toString(CharsetUtil.UTF_8);
+    }
+    return null;
+  }
+
+  private void fillBuffer() throws IOException {
+
+    int tailBytes = 0;
+    if (this.readBytes > 0) {
+      //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes)
+      this.buffer.markReaderIndex();
+      this.buffer.discardReadBytes();  // compact the buffer
+      tailBytes = this.buffer.writerIndex();
+      if (!this.buffer.isWritable()) {
+        // a line bytes is large than the buffer
+        BufferPool.ensureWritable(buffer, bufferSize * 2);
+        this.bufferSize = buffer.capacity();
+      }
+      this.startIndex = 0;
+    }
+
+    boolean release = true;
+    try {
+      int readBytes = tailBytes;
+      for (; ; ) {
+        int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes);
+        if (localReadBytes < 0) {
+          if (buffer.isWritable()) {
+            //if read bytes is less than the buffer capacity,  there is no more bytes in the channel
+            eof = true;
+          }
+          break;
+        }
+        readBytes += localReadBytes;
+        if (readBytes == bufferSize) {
+          break;
+        }
+      }
+      this.readBytes += (readBytes - tailBytes);
+      release = false;
+
+      this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
+    } finally {
+      if (release) {
+        buffer.release();
+      }
+    }
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
+    int readBytes = 0; // newline + text line bytes
+    int newlineLength = 0; //length of terminating newline
+    int readable;
+
+    this.startIndex = buffer.readerIndex();
+
+    loop:
+    while (true) {
+      readable = buffer.readableBytes();
+      if (readable <= 0) {
+        buffer.readerIndex(this.startIndex);
+        fillBuffer(); //compact and fill buffer
+
+        //if buffer.writerIndex() is zero, there is no bytes in buffer
+        if (!buffer.isReadable() && buffer.writerIndex() == 0) {
+          reads.set(0);
+          return null;
+        } else {
+          //skip first newLine
+          if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
+            buffer.skipBytes(1);
+            if(eof && !buffer.isReadable()) {
+              reads.set(1);
+              return null;
+            }
+
+            newlineLength++;
+            readBytes++;
+            startIndex = buffer.readerIndex();
+          }
+        }
+        readable = buffer.readableBytes();
+      }
+
+      int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor);
+      if (endIndex < 0) {
+        //does not appeared terminating newline
+        buffer.readerIndex(buffer.writerIndex()); // set to end buffer
+        if(eof){
+          readBytes += (buffer.readerIndex() - startIndex);
+          break loop;
+        }
+      } else {
+        buffer.readerIndex(endIndex + 1);
+        readBytes += (buffer.readerIndex() - startIndex); //past newline + text line
+
+        //appeared terminating CRLF
+        if (processor.isPrevCharCR() && buffer.isReadable()
+            && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
+          buffer.skipBytes(1);
+          readBytes++;
+          newlineLength += 2;
+        } else {
+          newlineLength += 1;
+        }
+        break loop;
+      }
+    }
+    reads.set(readBytes);
+    return buffer.slice(startIndex, readBytes - newlineLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
new file mode 100644
index 0000000..1599f62
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class CSVLineDeserializer extends TextLineDeserializer {
+  private FieldSplitProcessor processor;
+  private FieldSerializerDeserializer fieldSerDer;
+  private ByteBuf nullChars;
+
+  public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+    super(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public void init() {
+    this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
+
+    if (nullChars != null) {
+      nullChars.release();
+    }
+    nullChars = TextLineSerDe.getNullChars(meta);
+
+    fieldSerDer = new TextFieldSerializerDeserializer(meta);
+  }
+
+  public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
+    int[] projection = targetColumnIndexes;
+    if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
+      return;
+    }
+
+    final int rowLength = lineBuf.readableBytes();
+    int start = 0, fieldLength = 0, end = 0;
+
+    //Projection
+    int currentTarget = 0;
+    int currentIndex = 0;
+
+    while (end != -1) {
+      end = lineBuf.forEachByte(start, rowLength - start, processor);
+
+      if (end < 0) {
+        fieldLength = rowLength - start;
+      } else {
+        fieldLength = end - start;
+      }
+
+      if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
+        lineBuf.setIndex(start, start + fieldLength);
+        Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
+        output.put(currentIndex, datum);
+        currentTarget++;
+      }
+
+      if (projection.length == currentTarget) {
+        break;
+      }
+
+      start = end + 1;
+      currentIndex++;
+    }
+  }
+
+  @Override
+  public void release() {
+    if (nullChars != null) {
+      nullChars.release();
+      nullChars = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
new file mode 100644
index 0000000..2fe7f23
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.StorageConstants;
+
+public class CSVLineSerDe extends TextLineSerDe {
+  @Override
+  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+    return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+    return new CSVLineSerializer(schema, meta);
+  }
+
+  public static char getFieldDelimiter(TableMeta meta) {
+    return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
+        StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
new file mode 100644
index 0000000..53a0ef3
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class CSVLineSerializer extends TextLineSerializer {
+  private FieldSerializerDeserializer serde;
+
+  private byte [] nullChars;
+  private char delimiter;
+  private int columnNum;
+
+  public CSVLineSerializer(Schema schema, TableMeta meta) {
+    super(schema, meta);
+  }
+
+  @Override
+  public void init() {
+    nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
+    delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+    columnNum = schema.size();
+
+    serde = new TextFieldSerializerDeserializer(meta);
+  }
+
+  @Override
+  public int serialize(OutputStream out, Tuple input) throws IOException {
+    int writtenBytes = 0;
+
+    for (int i = 0; i < columnNum; i++) {
+      Datum datum = input.get(i);
+      writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
+
+      if (columnNum - 1 > i) {
+        out.write((byte) delimiter);
+        writtenBytes += 1;
+      }
+    }
+
+    return writtenBytes;
+  }
+
+  @Override
+  public void release() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
new file mode 100644
index 0000000..1b433b5
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.ByteBufInputChannel;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DelimitedLineReader implements Closeable {
+  private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
+  private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
+
+  private FileSystem fs;
+  private FSDataInputStream fis;
+  private InputStream is; //decompressd stream
+  private CompressionCodecFactory factory;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
+
+  private long startOffset, end, pos;
+  private boolean eof = true;
+  private ByteBufLineReader lineReader;
+  private AtomicInteger lineReadBytes = new AtomicInteger();
+  private FileFragment fragment;
+  private Configuration conf;
+
+  public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException {
+    this.fragment = fragment;
+    this.conf = conf;
+    this.factory = new CompressionCodecFactory(conf);
+    this.codec = factory.getCodec(fragment.getPath());
+    if (this.codec instanceof SplittableCompressionCodec) {
+      throw new NotImplementedException(); // bzip2 does not support multi-thread model
+    }
+  }
+
+  public void init() throws IOException {
+    if (fs == null) {
+      fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
+    }
+    if (fis == null) fis = fs.open(fragment.getPath());
+    pos = startOffset = fragment.getStartKey();
+    end = startOffset + fragment.getLength();
+
+    if (codec != null) {
+      decompressor = CodecPool.getDecompressor(codec);
+      is = new DataInputStream(codec.createInputStream(fis, decompressor));
+      ByteBufInputChannel channel = new ByteBufInputChannel(is);
+      lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
+    } else {
+      fis.seek(startOffset);
+      is = fis;
+
+      ByteBufInputChannel channel = new ByteBufInputChannel(is);
+      lineReader = new ByteBufLineReader(channel,
+          BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
+    }
+    eof = false;
+  }
+
+  public long getCompressedPosition() throws IOException {
+    long retVal;
+    if (isCompressed()) {
+      retVal = fis.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
+  public long getUnCompressedPosition() throws IOException {
+    return pos;
+  }
+
+  public long getReadBytes() {
+    return pos - startOffset;
+  }
+
+  public boolean isReadable() {
+    return !eof;
+  }
+
+  public ByteBuf readLine() throws IOException {
+    if (eof) {
+      return null;
+    }
+
+    ByteBuf buf = lineReader.readLineBuf(lineReadBytes);
+    pos += lineReadBytes.get();
+    if (buf == null) {
+      eof = true;
+    }
+
+    if (!isCompressed() && getCompressedPosition() > end) {
+      eof = true;
+    }
+    return buf;
+  }
+
+  public boolean isCompressed() {
+    return codec != null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      IOUtils.cleanup(LOG, lineReader, is, fis);
+      fs = null;
+      is = null;
+      fis = null;
+      lineReader = null;
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
new file mode 100644
index 0000000..8824e3e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.ReflectionUtil;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM;
+import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM;
+
+public class DelimitedTextFile {
+
+  public static final byte LF = '\n';
+
+  private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
+
+  /** it caches line serde classes. */
+  private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache =
+      new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>();
+
+  /**
+   * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given,
+   * it will use the specified serder class.
+   *
+   * @return TextLineSerder
+   */
+  public static TextLineSerDe getLineSerde(TableMeta meta) {
+    TextLineSerDe lineSerder;
+
+    String serDeClassName;
+
+    // if there is no given serde class, it will use CSV line serder.
+    serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS);
+
+    try {
+      Class<? extends TextLineSerDe> serdeClass;
+
+      if (serdeClassCache.containsKey(serDeClassName)) {
+        serdeClass = serdeClassCache.get(serDeClassName);
+      } else {
+        serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName);
+        serdeClassCache.put(serDeClassName, serdeClass);
+      }
+      lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
+    } catch (Throwable e) {
+      throw new RuntimeException("TextLineSerde class cannot be initialized.", e);
+    }
+
+    return lineSerder;
+  }
+
+  public static class DelimitedTextFileAppender extends FileAppender {
+    private final TableMeta meta;
+    private final Schema schema;
+    private final FileSystem fs;
+    private FSDataOutputStream fos;
+    private DataOutputStream outputStream;
+    private CompressionOutputStream deflateFilter;
+    private TableStatistics stats = null;
+    private Compressor compressor;
+    private CompressionCodecFactory codecFactory;
+    private CompressionCodec codec;
+    private Path compressedPath;
+    private byte[] nullChars;
+    private int BUFFER_SIZE = 128 * 1024;
+    private int bufferedBytes = 0;
+    private long pos = 0;
+
+    private NonSyncByteArrayOutputStream os;
+    private TextLineSerializer serializer;
+
+    public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+                                     final Schema schema, final TableMeta meta, final Path path)
+        throws IOException {
+      super(conf, taskAttemptId, schema, meta, path);
+      this.fs = path.getFileSystem(conf);
+      this.meta = meta;
+      this.schema = schema;
+    }
+
+    public TextLineSerDe getLineSerde() {
+      return DelimitedTextFile.getLineSerde(meta);
+    }
+
+    @Override
+    public void init() throws IOException {
+      if (!fs.exists(path.getParent())) {
+        throw new FileNotFoundException(path.toString());
+      }
+
+      if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+        String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+        codecFactory = new CompressionCodecFactory(conf);
+        codec = codecFactory.getCodecByClassName(codecName);
+        compressor = CodecPool.getCompressor(codec);
+        if (compressor != null) compressor.reset();  //builtin gzip is null
+
+        String extension = codec.getDefaultExtension();
+        compressedPath = path.suffix(extension);
+
+        if (fs.exists(compressedPath)) {
+          throw new AlreadyExistsStorageException(compressedPath);
+        }
+
+        fos = fs.create(compressedPath);
+        deflateFilter = codec.createOutputStream(fos, compressor);
+        outputStream = new DataOutputStream(deflateFilter);
+
+      } else {
+        if (fs.exists(path)) {
+          throw new AlreadyExistsStorageException(path);
+        }
+        fos = fs.create(path);
+        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+      }
+
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+
+      serializer = getLineSerde().createSerializer(schema, meta);
+      serializer.init();
+
+      if (os == null) {
+        os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+      }
+
+      os.reset();
+      pos = fos.getPos();
+      bufferedBytes = 0;
+      super.init();
+    }
+
+    @Override
+    public void addTuple(Tuple tuple) throws IOException {
+      // write
+      int rowBytes = serializer.serialize(os, tuple);
+
+      // new line
+      os.write(LF);
+      rowBytes += 1;
+
+      // update positions
+      pos += rowBytes;
+      bufferedBytes += rowBytes;
+
+      // refill buffer if necessary
+      if (bufferedBytes > BUFFER_SIZE) {
+        flushBuffer();
+      }
+      // Statistical section
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
+    private void flushBuffer() throws IOException {
+      if (os.getLength() > 0) {
+        os.writeTo(outputStream);
+        os.reset();
+        bufferedBytes = 0;
+      }
+    }
+
+    @Override
+    public long getOffset() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      flushBuffer();
+      outputStream.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+
+      try {
+        serializer.release();
+
+        if(outputStream != null){
+          flush();
+        }
+
+        // Statistical section
+        if (enabledStats) {
+          stats.setNumBytes(getOffset());
+        }
+
+        if (deflateFilter != null) {
+          deflateFilter.finish();
+          deflateFilter.resetState();
+          deflateFilter = null;
+        }
+
+        os.close();
+      } finally {
+        IOUtils.cleanup(LOG, fos);
+        if (compressor != null) {
+          CodecPool.returnCompressor(compressor);
+          compressor = null;
+        }
+      }
+    }
+
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+
+    public boolean isCompress() {
+      return compressor != null;
+    }
+
+    public String getExtension() {
+      return codec != null ? codec.getDefaultExtension() : "";
+    }
+  }
+
+  public static class DelimitedTextFileScanner extends FileScanner {
+    private boolean splittable = false;
+    private final long startOffset;
+
+    private final long endOffset;
+    /** The number of actual read records */
+    private int recordCount = 0;
+    private int[] targetColumnIndexes;
+
+    private DelimitedLineReader reader;
+    private TextLineDeserializer deserializer;
+
+    private int errorPrintOutMaxNum = 5;
+    /** Maximum number of permissible errors */
+    private int errorTorrenceMaxNum;
+    /** How many errors have occurred? */
+    private int errorNum;
+
+    public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
+                                    final Fragment fragment)
+        throws IOException {
+      super(conf, schema, meta, fragment);
+      reader = new DelimitedLineReader(conf, this.fragment);
+      if (!reader.isCompressed()) {
+        splittable = true;
+      }
+
+      startOffset = this.fragment.getStartKey();
+      endOffset = startOffset + fragment.getLength();
+
+      errorTorrenceMaxNum =
+          Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
+    }
+
+
+    @Override
+    public void init() throws IOException {
+      if (reader != null) {
+        reader.close();
+      }
+
+      reader = new DelimitedLineReader(conf, fragment);
+      reader.init();
+      recordCount = 0;
+
+      if (targets == null) {
+        targets = schema.toArray();
+      }
+
+      targetColumnIndexes = new int[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
+      }
+
+      super.init();
+      Arrays.sort(targetColumnIndexes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset);
+      }
+
+      if (startOffset > 0) {
+        reader.readLine();  // skip first line;
+      }
+
+      deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes);
+      deserializer.init();
+    }
+
+    public TextLineSerDe getLineSerde() {
+      return DelimitedTextFile.getLineSerde(meta);
+    }
+
+    @Override
+    public float getProgress() {
+      try {
+        if (!reader.isReadable()) {
+          return 1.0f;
+        }
+        long filePos = reader.getCompressedPosition();
+        if (startOffset == filePos) {
+          return 0.0f;
+        } else {
+          long readBytes = filePos - startOffset;
+          long remainingBytes = Math.max(endOffset - filePos, 0);
+          return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return 0.0f;
+      }
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      VTuple tuple;
+
+      if (!reader.isReadable()) {
+        return null;
+      }
+
+      try {
+
+        // this loop will continue until one tuple is build or EOS (end of stream).
+        do {
+
+          ByteBuf buf = reader.readLine();
+
+          // if no more line, then return EOT (end of tuple)
+          if (buf == null) {
+            return null;
+          }
+
+          // If there is no required column, we just read each line
+          // and then return an empty tuple without parsing line.
+          if (targets.length == 0) {
+            recordCount++;
+            return EmptyTuple.get();
+          }
+
+          tuple = new VTuple(schema.size());
+
+          try {
+            deserializer.deserialize(buf, tuple);
+            // if a line is read normaly, it exists this loop.
+            break;
+
+          } catch (TextLineParsingError tae) {
+
+            errorNum++;
+
+            // suppress too many log prints, which probably cause performance degradation
+            if (errorNum < errorPrintOutMaxNum) {
+              LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae);
+            }
+
+            // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0),
+            // it checks if the number of parsing error exceeds the max limit.
+            // Otherwise, it will ignore all parsing errors.
+            if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) {
+              throw tae;
+            }
+            continue;
+          }
+
+        } while (reader.isReadable()); // continue until EOS
+
+        // recordCount means the number of actual read records. We increment the count here.
+        recordCount++;
+
+        return tuple;
+
+      } catch (Throwable t) {
+        LOG.error(t);
+        throw new IOException(t);
+      }
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (deserializer != null) {
+          deserializer.release();
+        }
+
+        if (tableStats != null && reader != null) {
+          tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed Bytes. (decompressed bytes + overhead)
+          tableStats.setNumRows(recordCount);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DelimitedTextFileScanner processed record:" + recordCount);
+        }
+      } finally {
+        IOUtils.cleanup(LOG, reader);
+        reader = null;
+      }
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return true;
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public void setSearchCondition(Object expr) {
+    }
+
+    @Override
+    public boolean isSplittable() {
+      return splittable;
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      if (tableStats != null && reader != null) {
+        tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed Bytes. (decompressed bytes + overhead)
+        tableStats.setNumRows(recordCount);
+        tableStats.setNumBytes(fragment.getLength());
+      }
+      return tableStats;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
new file mode 100644
index 0000000..a5ac142
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class FieldSplitProcessor implements ByteBufProcessor {
+  private char delimiter; //the ascii separate character
+
+  public FieldSplitProcessor(char recordDelimiterByte) {
+    this.delimiter = recordDelimiterByte;
+  }
+
+  @Override
+  public boolean process(byte value) throws Exception {
+    return delimiter != value;
+  }
+
+  public char getDelimiter() {
+    return delimiter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
new file mode 100644
index 0000000..a130527
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class LineSplitProcessor implements ByteBufProcessor {
+  public static final byte CR = '\r';
+  public static final byte LF = '\n';
+  private boolean prevCharCR = false; //true of prev char was CR
+
+  @Override
+  public boolean process(byte value) throws Exception {
+    switch (value) {
+      case LF:
+        return false;
+      case CR:
+        prevCharCR = true;
+        return false;
+      default:
+        prevCharCR = false;
+        return true;
+    }
+  }
+
+  public boolean isPrevCharCR() {
+    return prevCharCR;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
new file mode 100644
index 0000000..ae7565d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.CharsetDecoder;
+import java.util.TimeZone;
+
+public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
+  public static final byte[] trueBytes = "true".getBytes();
+  public static final byte[] falseBytes = "false".getBytes();
+  private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+  private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
+
+  private final boolean hasTimezone;
+  private final TimeZone timezone;
+
+  public TextFieldSerializerDeserializer(TableMeta meta) {
+    hasTimezone = meta.containsOption(StorageConstants.TIMEZONE);
+    timezone = TimeZone.getTimeZone(meta.getOption(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE));
+  }
+
+  private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
+    return !val.isReadable() || nullBytes.equals(val);
+  }
+
+  private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
+    return val.readableBytes() > 0 && nullBytes.equals(val);
+  }
+
+  @Override
+  public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars)
+      throws IOException {
+    byte[] bytes;
+    int length = 0;
+    TajoDataTypes.DataType dataType = col.getDataType();
+
+    if (datum == null || datum instanceof NullDatum) {
+      switch (dataType.getType()) {
+        case CHAR:
+        case TEXT:
+          length = nullChars.length;
+          out.write(nullChars);
+          break;
+        default:
+          break;
+      }
+      return length;
+    }
+
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        out.write(datum.asBool() ? trueBytes : falseBytes);
+        length = trueBytes.length;
+        break;
+      case CHAR:
+        byte[] pad = new byte[dataType.getLength() - datum.size()];
+        bytes = datum.asTextBytes();
+        out.write(bytes);
+        out.write(pad);
+        length = bytes.length + pad.length;
+        break;
+      case TEXT:
+      case BIT:
+      case INT2:
+      case INT4:
+      case INT8:
+      case FLOAT4:
+      case FLOAT8:
+      case INET4:
+      case DATE:
+      case INTERVAL:
+        bytes = datum.asTextBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIME:
+        if (hasTimezone) {
+          bytes = ((TimeDatum) datum).asChars(timezone, true).getBytes();
+        } else {
+          bytes = datum.asTextBytes();
+        }
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIMESTAMP:
+        if (hasTimezone) {
+          bytes = ((TimestampDatum) datum).asChars(timezone, true).getBytes();
+        } else {
+          bytes = datum.asTextBytes();
+        }
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case INET6:
+      case BLOB:
+        bytes = Base64.encodeBase64(datum.asByteArray(), false);
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobuf = (ProtobufDatum) datum;
+        byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
+        length = protoBytes.length;
+        out.write(protoBytes, 0, protoBytes.length);
+        break;
+      case NULL_TYPE:
+      default:
+        break;
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException {
+    Datum datum;
+    TajoDataTypes.Type type = col.getDataType().getType();
+    boolean nullField;
+    if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
+      nullField = isNullText(buf, nullChars);
+    } else {
+      nullField = isNull(buf, nullChars);
+    }
+
+    if (nullField) {
+      datum = NullDatum.get();
+    } else {
+      switch (type) {
+        case BOOLEAN:
+          byte bool = buf.readByte();
+          datum = DatumFactory.createBool(bool == 't' || bool == 'T');
+          break;
+        case BIT:
+          datum = DatumFactory.createBit(Byte.parseByte(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()));
+          break;
+        case CHAR:
+          datum = DatumFactory.createChar(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim());
+          break;
+        case INT1:
+        case INT2:
+          datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
+          break;
+        case INT4:
+          datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
+          break;
+        case INT8:
+          datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
+          break;
+        case FLOAT4:
+          datum = DatumFactory.createFloat4(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+          break;
+        case FLOAT8:
+          datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
+          break;
+        case TEXT: {
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createText(bytes);
+          break;
+        }
+        case DATE:
+          datum = DatumFactory.createDate(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+          break;
+        case TIME:
+          if (hasTimezone) {
+            datum = DatumFactory.createTime(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone);
+          } else {
+            datum = DatumFactory.createTime(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+          }
+          break;
+        case TIMESTAMP:
+          if (hasTimezone) {
+            datum = DatumFactory.createTimestamp(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone);
+          } else {
+            datum = DatumFactory.createTimestamp(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+          }
+          break;
+        case INTERVAL:
+          datum = DatumFactory.createInterval(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+          break;
+        case PROTOBUF: {
+          ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+          Message.Builder builder = factory.newBuilder();
+          try {
+            byte[] bytes = new byte[buf.readableBytes()];
+            buf.readBytes(bytes);
+            protobufJsonFormat.merge(bytes, builder);
+            datum = factory.createDatum(builder.build());
+          } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          break;
+        }
+        case INET4:
+          datum = DatumFactory.createInet4(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+          break;
+        case BLOB: {
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
+          break;
+        }
+        default:
+          datum = NullDatum.get();
+          break;
+      }
+    }
+    return datum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
new file mode 100644
index 0000000..7ebfa79
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * Reads a text line and fills a Tuple with values
+ */
+public abstract class TextLineDeserializer {
+  protected Schema schema;
+  protected TableMeta meta;
+  protected int [] targetColumnIndexes;
+
+  public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
+    this.schema = schema;
+    this.meta = meta;
+    this.targetColumnIndexes = targetColumnIndexes;
+  }
+
+  /**
+   * Initialize SerDe
+   */
+  public abstract void init();
+
+  /**
+   * It fills a tuple with a read fields in a given line.
+   *
+   * @param buf Read line
+   * @param output Tuple to be filled with read fields
+   * @throws java.io.IOException
+   */
+  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError;
+
+  /**
+   * Release external resources
+   */
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
new file mode 100644
index 0000000..f0bae5e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+public class TextLineParsingError extends Exception {
+
+  public TextLineParsingError(Throwable t) {
+    super(t);
+  }
+
+  public TextLineParsingError(String message, Throwable t) {
+    super(t.getMessage() + ", Error line: " + message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
new file mode 100644
index 0000000..e81e289
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.StorageConstants;
+
+/**
+ * Pluggable Text Line SerDe class
+ */
+public abstract class TextLineSerDe {
+
+  public TextLineSerDe() {
+  }
+
+  public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes);
+
+  public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
+
+  public static ByteBuf getNullChars(TableMeta meta) {
+    byte[] nullCharByteArray = getNullCharsAsBytes(meta);
+
+    ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length);
+    nullChars.writeBytes(nullCharByteArray);
+
+    return nullChars;
+  }
+
+  public static byte [] getNullCharsAsBytes(TableMeta meta) {
+    byte [] nullChars;
+
+    String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+        NullDatum.DEFAULT_TEXT));
+    if (StringUtils.isEmpty(nullCharacters)) {
+      nullChars = NullDatum.get().asTextBytes();
+    } else {
+      nullChars = nullCharacters.getBytes();
+    }
+
+    return nullChars;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
new file mode 100644
index 0000000..0c2761f
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Write a Tuple into single text formatted line
+ */
+public abstract class TextLineSerializer {
+  protected Schema schema;
+  protected TableMeta meta;
+
+  public TextLineSerializer(Schema schema, TableMeta meta) {
+    this.schema = schema;
+    this.meta = meta;
+  }
+
+  public abstract void init();
+
+  public abstract int serialize(OutputStream out, Tuple input) throws IOException;
+
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
new file mode 100644
index 0000000..f76593e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.BadConfigurationException;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+class CodecFactory {
+
+  public class BytesDecompressor {
+
+    private final CompressionCodec codec;
+    private final Decompressor decompressor;
+
+    public BytesDecompressor(CompressionCodec codec) {
+      this.codec = codec;
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+      } else {
+        decompressor = null;
+      }
+    }
+
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      final BytesInput decompressed;
+      if (codec != null) {
+        decompressor.reset();
+        InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
+        decompressed = BytesInput.from(is, uncompressedSize);
+      } else {
+        decompressed = bytes;
+      }
+      return decompressed;
+    }
+
+    private void release() {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+  }
+
+  /**
+   * Encapsulates the logic around hadoop compression
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static class BytesCompressor {
+
+    private final CompressionCodec codec;
+    private final Compressor compressor;
+    private final ByteArrayOutputStream compressedOutBuffer;
+    private final CompressionCodecName codecName;
+
+    public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
+      this.codecName = codecName;
+      this.codec = codec;
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
+      } else {
+        this.compressor = null;
+        this.compressedOutBuffer = null;
+      }
+    }
+
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      final BytesInput compressedBytes;
+      if (codec == null) {
+        compressedBytes = bytes;
+      } else {
+        compressedOutBuffer.reset();
+        if (compressor != null) {
+          // null compressor for non-native gzip
+          compressor.reset();
+        }
+        CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
+        bytes.writeAllTo(cos);
+        cos.finish();
+        cos.close();
+        compressedBytes = BytesInput.from(compressedOutBuffer);
+      }
+      return compressedBytes;
+    }
+
+    private void release() {
+      if (compressor != null) {
+        CodecPool.returnCompressor(compressor);
+      }
+    }
+
+    public CompressionCodecName getCodecName() {
+      return codecName;
+    }
+
+  }
+
+  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
+  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
+  private final Configuration configuration;
+
+  public CodecFactory(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  /**
+   *
+   * @param codecName the requested codec
+   * @return the corresponding hadoop codec. null if UNCOMPRESSED
+   */
+  private CompressionCodec getCodec(CompressionCodecName codecName) {
+    String codecClassName = codecName.getHadoopCompressionCodecClassName();
+    if (codecClassName == null) {
+      return null;
+    }
+    CompressionCodec codec = codecByName.get(codecClassName);
+    if (codec != null) {
+      return codec;
+    }
+
+    try {
+      Class<?> codecClass = Class.forName(codecClassName);
+      codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
+      codecByName.put(codecClassName, codec);
+      return codec;
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
+    }
+  }
+
+  public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
+    BytesCompressor comp = compressors.get(codecName);
+    if (comp == null) {
+      CompressionCodec codec = getCodec(codecName);
+      comp = new BytesCompressor(codecName, codec, pageSize);
+      compressors.put(codecName, comp);
+    }
+    return comp;
+  }
+
+  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
+    BytesDecompressor decomp = decompressors.get(codecName);
+    if (decomp == null) {
+      CompressionCodec codec = getCodec(codecName);
+      decomp = new BytesDecompressor(codec);
+      decompressors.put(codecName, decomp);
+    }
+    return decomp;
+  }
+
+  public void release() {
+    for (BytesCompressor compressor : compressors.values()) {
+      compressor.release();
+    }
+    compressors.clear();
+    for (BytesDecompressor decompressor : decompressors.values()) {
+      decompressor.release();
+    }
+    decompressors.clear();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..0dedd9b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import parquet.Log;
+import parquet.bytes.BytesInput;
+import parquet.bytes.CapacityByteArrayOutputStream;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.PageWriteStore;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.BooleanStatistics;
+import parquet.column.statistics.Statistics;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.io.ParquetEncodingException;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
+import static parquet.Log.INFO;
+
+class ColumnChunkPageWriteStore implements PageWriteStore {
+  private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
+
+  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+
+  private static final class ColumnChunkPageWriter implements PageWriter {
+
+    private final ColumnDescriptor path;
+    private final BytesCompressor compressor;
+
+    private final CapacityByteArrayOutputStream buf;
+    private DictionaryPage dictionaryPage;
+
+    private long uncompressedLength;
+    private long compressedLength;
+    private long totalValueCount;
+    private int pageCount;
+
+    private Set<Encoding> encodings = new HashSet<Encoding>();
+
+    private Statistics totalStatistics;
+
+    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) {
+      this.path = path;
+      this.compressor = compressor;
+      this.buf = new CapacityByteArrayOutputStream(initialSize);
+      this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
+    }
+
+    @Deprecated
+    @Override
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      long uncompressedSize = bytes.size();
+      BytesInput compressedBytes = compressor.compress(bytes);
+      long compressedSize = compressedBytes.size();
+      BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
+      parquetMetadataConverter.writeDataPageHeader(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      compressedBytes.writeAllTo(buf);
+      encodings.add(rlEncoding);
+      encodings.add(dlEncoding);
+      encodings.add(valuesEncoding);
+    }
+
+    @Override
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          Statistics statistics,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      long uncompressedSize = bytes.size();
+      BytesInput compressedBytes = compressor.compress(bytes);
+      long compressedSize = compressedBytes.size();
+      parquetMetadataConverter.writeDataPageHeader(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+      compressedBytes.writeAllTo(buf);
+      encodings.add(rlEncoding);
+      encodings.add(dlEncoding);
+      encodings.add(valuesEncoding);
+    }
+
+    @Override
+    public long getMemSize() {
+      return buf.size();
+    }
+
+    public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
+      writer.startColumn(path, totalValueCount, compressor.getCodecName());
+      if (dictionaryPage != null) {
+        writer.writeDictionaryPage(dictionaryPage);
+        encodings.add(dictionaryPage.getEncoding());
+      }
+      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
+      writer.endColumn();
+      if (INFO) {
+        LOG.info(
+            String.format(
+                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
+                buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
+                + (dictionaryPage != null ? String.format(
+                ", dic { %,d entries, %,dB raw, %,dB comp}",
+                dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+                : ""));
+      }
+      encodings.clear();
+      pageCount = 0;
+    }
+
+    @Override
+    public long allocatedSize() {
+      return buf.getCapacity();
+    }
+
+    @Override
+    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+      if (this.dictionaryPage != null) {
+        throw new ParquetEncodingException("Only one dictionary page is allowed");
+      }
+      BytesInput dictionaryBytes = dictionaryPage.getBytes();
+      int uncompressedSize = (int)dictionaryBytes.size();
+      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
+    }
+  }
+
+  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+  private final MessageType schema;
+  private final BytesCompressor compressor;
+  private final int initialSize;
+
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
+    this.compressor = compressor;
+    this.schema = schema;
+    this.initialSize = initialSize;
+  }
+
+  @Override
+  public PageWriter getPageWriter(ColumnDescriptor path) {
+    if (!writers.containsKey(path)) {
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSize));
+    }
+    return writers.get(path);
+  }
+
+  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+    List<ColumnDescriptor> columns = schema.getColumns();
+    for (ColumnDescriptor columnDescriptor : columns) {
+      ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
+      pageWriter.writeToFileWriter(writer);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
new file mode 100644
index 0000000..6bbd7b5
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.PageReadStore;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.util.counters.BenchmarkCounter;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static parquet.Log.DEBUG;
+
+class InternalParquetRecordReader<T> {
+  private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
+
+  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+
+  private MessageType requestedSchema;
+  private MessageType fileSchema;
+  private int columnCount;
+  private final ReadSupport<T> readSupport;
+
+  private RecordMaterializer<T> recordConverter;
+
+  private T currentValue;
+  private long total;
+  private int current = 0;
+  private int currentBlock = -1;
+  private ParquetFileReader reader;
+  private parquet.io.RecordReader<T> recordReader;
+  private UnboundRecordFilter recordFilter;
+
+  private long totalTimeSpentReadingBytes;
+  private long totalTimeSpentProcessingRecords;
+  private long startedAssemblingCurrentBlockAt;
+
+  private long totalCountLoadedSoFar = 0;
+
+  private Path file;
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport) {
+    this(readSupport, null);
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter Optional filter for only returning matching records.
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
+      filter) {
+    this.readSupport = readSupport;
+    this.recordFilter = filter;
+  }
+
+  private void checkRead() throws IOException {
+    if (current == totalCountLoadedSoFar) {
+      if (current != 0) {
+        long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
+        totalTimeSpentProcessingRecords += timeAssembling;
+        LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+        long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
+        long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+        long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
+        LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+      }
+
+      LOG.info("at row " + current + ". reading next block");
+      long t0 = System.currentTimeMillis();
+      PageReadStore pages = reader.readNextRowGroup();
+      if (pages == null) {
+        throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
+      }
+      long timeSpentReading = System.currentTimeMillis() - t0;
+      totalTimeSpentReadingBytes += timeSpentReading;
+      BenchmarkCounter.incrementTime(timeSpentReading);
+      LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+      if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
+      recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+      startedAssemblingCurrentBlockAt = System.currentTimeMillis();
+      totalCountLoadedSoFar += pages.getRowCount();
+      ++ currentBlock;
+    }
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  public T getCurrentValue() throws IOException,
+      InterruptedException {
+    return currentValue;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return (float) current / total;
+  }
+
+  public void initialize(MessageType requestedSchema, MessageType fileSchema,
+                         Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
+                         Path file, List<BlockMetaData> blocks, Configuration configuration)
+      throws IOException {
+    this.requestedSchema = requestedSchema;
+    this.fileSchema = fileSchema;
+    this.file = file;
+    this.columnCount = this.requestedSchema.getPaths().size();
+    this.recordConverter = readSupport.prepareForRead(
+        configuration, extraMetadata, fileSchema,
+        new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
+
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    reader = new ParquetFileReader(configuration, file, blocks, columns);
+    for (BlockMetaData block : blocks) {
+      total += block.getRowCount();
+    }
+    LOG.info("RecordReader initialized will read a total of " + total + " records.");
+  }
+
+  private boolean contains(GroupType group, String[] path, int index) {
+    if (index == path.length) {
+      return false;
+    }
+    if (group.containsField(path[index])) {
+      Type type = group.getType(path[index]);
+      if (type.isPrimitive()) {
+        return index + 1 == path.length;
+      } else {
+        return contains(type.asGroupType(), path, index + 1);
+      }
+    }
+    return false;
+  }
+
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (current < total) {
+      try {
+        checkRead();
+        currentValue = recordReader.read();
+        if (DEBUG) LOG.debug("read value: " + currentValue);
+        current ++;
+      } catch (RuntimeException e) {
+        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
+      }
+      return true;
+    }
+    return false;
+  }
+}