You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:41 UTC

[27/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
deleted file mode 100644
index 2f742c6..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.CharsetUtil;
-import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.ByteBufInputChannel;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ByteBufLineReader implements Closeable {
-  private static int DEFAULT_BUFFER = 64 * 1024;
-
-  private int bufferSize;
-  private long readBytes;
-  private int startIndex;
-  private boolean eof = false;
-  private ByteBuf buffer;
-  private final ByteBufInputChannel channel;
-  private final AtomicInteger lineReadBytes = new AtomicInteger();
-  private final LineSplitProcessor processor = new LineSplitProcessor();
-
-  public ByteBufLineReader(ByteBufInputChannel channel) {
-    this(channel, BufferPool.directBuffer(DEFAULT_BUFFER));
-  }
-
-  public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) {
-    this.readBytes = 0;
-    this.channel = channel;
-    this.buffer = buf;
-    this.bufferSize = buf.capacity();
-  }
-
-  public long readBytes() {
-    return readBytes - buffer.readableBytes();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (this.buffer.refCnt() > 0) {
-      this.buffer.release();
-    }
-    this.channel.close();
-  }
-
-  public String readLine() throws IOException {
-    ByteBuf buf = readLineBuf(lineReadBytes);
-    if (buf != null) {
-      return buf.toString(CharsetUtil.UTF_8);
-    }
-    return null;
-  }
-
-  private void fillBuffer() throws IOException {
-
-    int tailBytes = 0;
-    if (this.readBytes > 0) {
-      //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes)
-      this.buffer.markReaderIndex();
-      this.buffer.discardReadBytes();  // compact the buffer
-      tailBytes = this.buffer.writerIndex();
-      if (!this.buffer.isWritable()) {
-        // a line bytes is large than the buffer
-        BufferPool.ensureWritable(buffer, bufferSize * 2);
-        this.bufferSize = buffer.capacity();
-      }
-      this.startIndex = 0;
-    }
-
-    boolean release = true;
-    try {
-      int readBytes = tailBytes;
-      for (; ; ) {
-        int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes);
-        if (localReadBytes < 0) {
-          if (buffer.isWritable()) {
-            //if read bytes is less than the buffer capacity,  there is no more bytes in the channel
-            eof = true;
-          }
-          break;
-        }
-        readBytes += localReadBytes;
-        if (readBytes == bufferSize) {
-          break;
-        }
-      }
-      this.readBytes += (readBytes - tailBytes);
-      release = false;
-
-      this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
-    } finally {
-      if (release) {
-        buffer.release();
-      }
-    }
-  }
-
-  /**
-   * Read a line terminated by one of CR, LF, or CRLF.
-   */
-  public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
-    int readBytes = 0; // newline + text line bytes
-    int newlineLength = 0; //length of terminating newline
-    int readable;
-
-    this.startIndex = buffer.readerIndex();
-
-    loop:
-    while (true) {
-      readable = buffer.readableBytes();
-      if (readable <= 0) {
-        buffer.readerIndex(this.startIndex);
-        fillBuffer(); //compact and fill buffer
-
-        //if buffer.writerIndex() is zero, there is no bytes in buffer
-        if (!buffer.isReadable() && buffer.writerIndex() == 0) {
-          reads.set(0);
-          return null;
-        } else {
-          //skip first newLine
-          if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
-            buffer.skipBytes(1);
-            if(eof && !buffer.isReadable()) {
-              reads.set(1);
-              return null;
-            }
-
-            newlineLength++;
-            readBytes++;
-            startIndex = buffer.readerIndex();
-          }
-        }
-        readable = buffer.readableBytes();
-      }
-
-      int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor);
-      if (endIndex < 0) {
-        //does not appeared terminating newline
-        buffer.readerIndex(buffer.writerIndex()); // set to end buffer
-        if(eof){
-          readBytes += (buffer.readerIndex() - startIndex);
-          break loop;
-        }
-      } else {
-        buffer.readerIndex(endIndex + 1);
-        readBytes += (buffer.readerIndex() - startIndex); //past newline + text line
-
-        //appeared terminating CRLF
-        if (processor.isPrevCharCR() && buffer.isReadable()
-            && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
-          buffer.skipBytes(1);
-          readBytes++;
-          newlineLength += 2;
-        } else {
-          newlineLength += 1;
-        }
-        break loop;
-      }
-    }
-    reads.set(readBytes);
-    return buffer.slice(startIndex, readBytes - newlineLength);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
deleted file mode 100644
index 1599f62..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.FieldSerializerDeserializer;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public class CSVLineDeserializer extends TextLineDeserializer {
-  private FieldSplitProcessor processor;
-  private FieldSerializerDeserializer fieldSerDer;
-  private ByteBuf nullChars;
-
-  public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
-    super(schema, meta, targetColumnIndexes);
-  }
-
-  @Override
-  public void init() {
-    this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
-
-    if (nullChars != null) {
-      nullChars.release();
-    }
-    nullChars = TextLineSerDe.getNullChars(meta);
-
-    fieldSerDer = new TextFieldSerializerDeserializer(meta);
-  }
-
-  public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
-    int[] projection = targetColumnIndexes;
-    if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
-      return;
-    }
-
-    final int rowLength = lineBuf.readableBytes();
-    int start = 0, fieldLength = 0, end = 0;
-
-    //Projection
-    int currentTarget = 0;
-    int currentIndex = 0;
-
-    while (end != -1) {
-      end = lineBuf.forEachByte(start, rowLength - start, processor);
-
-      if (end < 0) {
-        fieldLength = rowLength - start;
-      } else {
-        fieldLength = end - start;
-      }
-
-      if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
-        lineBuf.setIndex(start, start + fieldLength);
-        Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
-        output.put(currentIndex, datum);
-        currentTarget++;
-      }
-
-      if (projection.length == currentTarget) {
-        break;
-      }
-
-      start = end + 1;
-      currentIndex++;
-    }
-  }
-
-  @Override
-  public void release() {
-    if (nullChars != null) {
-      nullChars.release();
-      nullChars = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
deleted file mode 100644
index 2fe7f23..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.StorageConstants;
-
-public class CSVLineSerDe extends TextLineSerDe {
-  @Override
-  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
-    return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
-  }
-
-  @Override
-  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
-    return new CSVLineSerializer(schema, meta);
-  }
-
-  public static char getFieldDelimiter(TableMeta meta) {
-    return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
-        StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
deleted file mode 100644
index c0fc18f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.FieldSerializerDeserializer;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class CSVLineSerializer extends TextLineSerializer {
-  private FieldSerializerDeserializer serde;
-
-  private byte [] nullChars;
-  private char delimiter;
-  private int columnNum;
-
-  public CSVLineSerializer(Schema schema, TableMeta meta) {
-    super(schema, meta);
-  }
-
-  @Override
-  public void init() {
-    nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
-    delimiter = CSVLineSerDe.getFieldDelimiter(meta);
-    columnNum = schema.size();
-
-    serde = new TextFieldSerializerDeserializer(meta);
-  }
-
-  @Override
-  public int serialize(OutputStream out, Tuple input) throws IOException {
-    int writtenBytes = 0;
-
-    for (int i = 0; i < columnNum; i++) {
-      Datum datum = input.get(i);
-      writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
-
-      if (columnNum - 1 > i) {
-        out.write((byte) delimiter);
-        writtenBytes += 1;
-      }
-    }
-
-    return writtenBytes;
-  }
-
-  @Override
-  public void release() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
deleted file mode 100644
index 0efe030..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.tajo.common.exception.NotImplementedException;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.ByteBufInputChannel;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DelimitedLineReader implements Closeable {
-  private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
-  private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
-
-  private FileSystem fs;
-  private FSDataInputStream fis;
-  private InputStream is; //decompressd stream
-  private CompressionCodecFactory factory;
-  private CompressionCodec codec;
-  private Decompressor decompressor;
-
-  private long startOffset, end, pos;
-  private boolean eof = true;
-  private ByteBufLineReader lineReader;
-  private AtomicInteger lineReadBytes = new AtomicInteger();
-  private FileFragment fragment;
-  private Configuration conf;
-
-  public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException {
-    this.fragment = fragment;
-    this.conf = conf;
-    this.factory = new CompressionCodecFactory(conf);
-    this.codec = factory.getCodec(fragment.getPath());
-    if (this.codec instanceof SplittableCompressionCodec) {
-      throw new NotImplementedException(); // bzip2 does not support multi-thread model
-    }
-  }
-
-  public void init() throws IOException {
-    if (fs == null) {
-      fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
-    }
-    if (fis == null) fis = fs.open(fragment.getPath());
-    pos = startOffset = fragment.getStartKey();
-    end = startOffset + fragment.getEndKey();
-
-    if (codec != null) {
-      decompressor = CodecPool.getDecompressor(codec);
-      is = new DataInputStream(codec.createInputStream(fis, decompressor));
-      ByteBufInputChannel channel = new ByteBufInputChannel(is);
-      lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
-    } else {
-      fis.seek(startOffset);
-      is = fis;
-
-      ByteBufInputChannel channel = new ByteBufInputChannel(is);
-      lineReader = new ByteBufLineReader(channel,
-          BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
-    }
-    eof = false;
-  }
-
-  public long getCompressedPosition() throws IOException {
-    long retVal;
-    if (isCompressed()) {
-      retVal = fis.getPos();
-    } else {
-      retVal = pos;
-    }
-    return retVal;
-  }
-
-  public long getUnCompressedPosition() throws IOException {
-    return pos;
-  }
-
-  public long getReadBytes() {
-    return pos - startOffset;
-  }
-
-  public boolean isReadable() {
-    return !eof;
-  }
-
-  public ByteBuf readLine() throws IOException {
-    if (eof) {
-      return null;
-    }
-
-    ByteBuf buf = lineReader.readLineBuf(lineReadBytes);
-    pos += lineReadBytes.get();
-    if (buf == null) {
-      eof = true;
-    }
-
-    if (!isCompressed() && getCompressedPosition() > end) {
-      eof = true;
-    }
-    return buf;
-  }
-
-  public boolean isCompressed() {
-    return codec != null;
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      IOUtils.cleanup(LOG, lineReader, is, fis);
-      fs = null;
-      is = null;
-      fis = null;
-      lineReader = null;
-    } finally {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-        decompressor = null;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
deleted file mode 100644
index ab8a0b5..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.ReflectionUtil;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM;
-import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM;
-
-public class DelimitedTextFile {
-
-  public static final byte LF = '\n';
-  public static int EOF = -1;
-
-  private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
-
-  /** it caches line serde classes. */
-  private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache =
-      new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>();
-
-  /**
-   * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given,
-   * it will use the specified serder class.
-   *
-   * @return TextLineSerder
-   */
-  public static TextLineSerDe getLineSerde(TableMeta meta) {
-    TextLineSerDe lineSerder;
-
-    String serDeClassName;
-
-    // if there is no given serde class, it will use CSV line serder.
-    serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS);
-
-    try {
-      Class<? extends TextLineSerDe> serdeClass;
-
-      if (serdeClassCache.containsKey(serDeClassName)) {
-        serdeClass = serdeClassCache.get(serDeClassName);
-      } else {
-        serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName);
-        serdeClassCache.put(serDeClassName, serdeClass);
-      }
-      lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
-    } catch (Throwable e) {
-      throw new RuntimeException("TextLineSerde class cannot be initialized.", e);
-    }
-
-    return lineSerder;
-  }
-
-  public static class DelimitedTextFileAppender extends FileAppender {
-    private final TableMeta meta;
-    private final Schema schema;
-    private final FileSystem fs;
-    private FSDataOutputStream fos;
-    private DataOutputStream outputStream;
-    private CompressionOutputStream deflateFilter;
-    private TableStatistics stats = null;
-    private Compressor compressor;
-    private CompressionCodecFactory codecFactory;
-    private CompressionCodec codec;
-    private Path compressedPath;
-    private byte[] nullChars;
-    private int BUFFER_SIZE = 128 * 1024;
-    private int bufferedBytes = 0;
-    private long pos = 0;
-
-    private NonSyncByteArrayOutputStream os;
-    private TextLineSerializer serializer;
-
-    public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
-        throws IOException {
-      super(conf, schema, meta, path);
-      this.fs = path.getFileSystem(conf);
-      this.meta = meta;
-      this.schema = schema;
-    }
-
-    public TextLineSerDe getLineSerde() {
-      return DelimitedTextFile.getLineSerde(meta);
-    }
-
-    @Override
-    public void init() throws IOException {
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
-      if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
-        String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
-        codecFactory = new CompressionCodecFactory(conf);
-        codec = codecFactory.getCodecByClassName(codecName);
-        compressor = CodecPool.getCompressor(codec);
-        if (compressor != null) compressor.reset();  //builtin gzip is null
-
-        String extension = codec.getDefaultExtension();
-        compressedPath = path.suffix(extension);
-
-        if (fs.exists(compressedPath)) {
-          throw new AlreadyExistsStorageException(compressedPath);
-        }
-
-        fos = fs.create(compressedPath);
-        deflateFilter = codec.createOutputStream(fos, compressor);
-        outputStream = new DataOutputStream(deflateFilter);
-
-      } else {
-        if (fs.exists(path)) {
-          throw new AlreadyExistsStorageException(path);
-        }
-        fos = fs.create(path);
-        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
-      }
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-
-      serializer = getLineSerde().createSerializer(schema, meta);
-      serializer.init();
-
-      if (os == null) {
-        os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
-      }
-
-      os.reset();
-      pos = fos.getPos();
-      bufferedBytes = 0;
-      super.init();
-    }
-
-    @Override
-    public void addTuple(Tuple tuple) throws IOException {
-      // write
-      int rowBytes = serializer.serialize(os, tuple);
-
-      // new line
-      os.write(LF);
-      rowBytes += 1;
-
-      // update positions
-      pos += rowBytes;
-      bufferedBytes += rowBytes;
-
-      // refill buffer if necessary
-      if (bufferedBytes > BUFFER_SIZE) {
-        flushBuffer();
-      }
-      // Statistical section
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if (os.getLength() > 0) {
-        os.writeTo(outputStream);
-        os.reset();
-        bufferedBytes = 0;
-      }
-    }
-
-    @Override
-    public long getOffset() throws IOException {
-      return pos;
-    }
-
-    @Override
-    public void flush() throws IOException {
-      flushBuffer();
-      outputStream.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-
-      try {
-        serializer.release();
-
-        if(outputStream != null){
-          flush();
-        }
-
-        // Statistical section
-        if (enabledStats) {
-          stats.setNumBytes(getOffset());
-        }
-
-        if (deflateFilter != null) {
-          deflateFilter.finish();
-          deflateFilter.resetState();
-          deflateFilter = null;
-        }
-
-        os.close();
-      } finally {
-        IOUtils.cleanup(LOG, fos);
-        if (compressor != null) {
-          CodecPool.returnCompressor(compressor);
-          compressor = null;
-        }
-      }
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-
-    public boolean isCompress() {
-      return compressor != null;
-    }
-
-    public String getExtension() {
-      return codec != null ? codec.getDefaultExtension() : "";
-    }
-  }
-
-  public static class DelimitedTextFileScanner extends FileScanner {
-    private boolean splittable = false;
-    private final long startOffset;
-
-    private final long endOffset;
-    /** The number of actual read records */
-    private int recordCount = 0;
-    private int[] targetColumnIndexes;
-
-    private DelimitedLineReader reader;
-    private TextLineDeserializer deserializer;
-
-    private int errorPrintOutMaxNum = 5;
-    /** Maximum number of permissible errors */
-    private int errorTorrenceMaxNum;
-    /** How many errors have occurred? */
-    private int errorNum;
-
-    public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
-                                    final FileFragment fragment)
-        throws IOException {
-      super(conf, schema, meta, fragment);
-      reader = new DelimitedLineReader(conf, fragment);
-      if (!reader.isCompressed()) {
-        splittable = true;
-      }
-
-      startOffset = fragment.getStartKey();
-      endOffset = startOffset + fragment.getEndKey();
-
-      errorTorrenceMaxNum =
-          Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
-    }
-
-    @Override
-    public void init() throws IOException {
-      if (reader != null) {
-        reader.close();
-      }
-
-      reader = new DelimitedLineReader(conf, fragment);
-      reader.init();
-      recordCount = 0;
-
-      if (targets == null) {
-        targets = schema.toArray();
-      }
-
-      targetColumnIndexes = new int[targets.length];
-      for (int i = 0; i < targets.length; i++) {
-        targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
-      }
-
-      super.init();
-      Arrays.sort(targetColumnIndexes);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset);
-      }
-
-      if (startOffset > 0) {
-        reader.readLine();  // skip first line;
-      }
-
-      deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes);
-      deserializer.init();
-    }
-
-    public TextLineSerDe getLineSerde() {
-      return DelimitedTextFile.getLineSerde(meta);
-    }
-
-    @Override
-    public float getProgress() {
-      try {
-        if (!reader.isReadable()) {
-          return 1.0f;
-        }
-        long filePos = reader.getCompressedPosition();
-        if (startOffset == filePos) {
-          return 0.0f;
-        } else {
-          long readBytes = filePos - startOffset;
-          long remainingBytes = Math.max(endOffset - filePos, 0);
-          return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes));
-        }
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-        return 0.0f;
-      }
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      VTuple tuple;
-
-      if (!reader.isReadable()) {
-        return null;
-      }
-
-      try {
-
-        // this loop will continue until one tuple is build or EOS (end of stream).
-        do {
-
-          ByteBuf buf = reader.readLine();
-
-          // if no more line, then return EOT (end of tuple)
-          if (buf == null) {
-            return null;
-          }
-
-          // If there is no required column, we just read each line
-          // and then return an empty tuple without parsing line.
-          if (targets.length == 0) {
-            recordCount++;
-            return EmptyTuple.get();
-          }
-
-          tuple = new VTuple(schema.size());
-
-          try {
-            deserializer.deserialize(buf, tuple);
-            // if a line is read normaly, it exists this loop.
-            break;
-
-          } catch (TextLineParsingError tae) {
-
-            errorNum++;
-
-            // suppress too many log prints, which probably cause performance degradation
-            if (errorNum < errorPrintOutMaxNum) {
-              LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae);
-            }
-
-            // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0),
-            // it checks if the number of parsing error exceeds the max limit.
-            // Otherwise, it will ignore all parsing errors.
-            if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) {
-              throw tae;
-            }
-            continue;
-          }
-
-        } while (reader.isReadable()); // continue until EOS
-
-        // recordCount means the number of actual read records. We increment the count here.
-        recordCount++;
-
-        return tuple;
-
-      } catch (Throwable t) {
-        LOG.error(t);
-        throw new IOException(t);
-      }
-    }
-
-    @Override
-    public void reset() throws IOException {
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        if (deserializer != null) {
-          deserializer.release();
-        }
-
-        if (tableStats != null && reader != null) {
-          tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed Bytes. (decompressed bytes + overhead)
-          tableStats.setNumRows(recordCount);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("DelimitedTextFileScanner processed record:" + recordCount);
-        }
-      } finally {
-        IOUtils.cleanup(LOG, reader);
-        reader = null;
-      }
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return true;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setSearchCondition(Object expr) {
-    }
-
-    @Override
-    public boolean isSplittable() {
-      return splittable;
-    }
-
-    @Override
-    public TableStats getInputStats() {
-      if (tableStats != null && reader != null) {
-        tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed Bytes. (decompressed bytes + overhead)
-        tableStats.setNumRows(recordCount);
-        tableStats.setNumBytes(fragment.getEndKey());
-      }
-      return tableStats;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
deleted file mode 100644
index a5ac142..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBufProcessor;
-
-public class FieldSplitProcessor implements ByteBufProcessor {
-  private char delimiter; //the ascii separate character
-
-  public FieldSplitProcessor(char recordDelimiterByte) {
-    this.delimiter = recordDelimiterByte;
-  }
-
-  @Override
-  public boolean process(byte value) throws Exception {
-    return delimiter != value;
-  }
-
-  public char getDelimiter() {
-    return delimiter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
deleted file mode 100644
index a130527..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBufProcessor;
-
-public class LineSplitProcessor implements ByteBufProcessor {
-  public static final byte CR = '\r';
-  public static final byte LF = '\n';
-  private boolean prevCharCR = false; //true of prev char was CR
-
-  @Override
-  public boolean process(byte value) throws Exception {
-    switch (value) {
-      case LF:
-        return false;
-      case CR:
-        prevCharCR = true;
-        return false;
-      default:
-        prevCharCR = false;
-        return true;
-    }
-  }
-
-  public boolean isPrevCharCR() {
-    return prevCharCR;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
deleted file mode 100644
index ae7565d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import com.google.protobuf.Message;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.CharsetUtil;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.storage.FieldSerializerDeserializer;
-import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.util.NumberUtil;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.CharsetDecoder;
-import java.util.TimeZone;
-
-public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
-  public static final byte[] trueBytes = "true".getBytes();
-  public static final byte[] falseBytes = "false".getBytes();
-  private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
-  private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
-
-  private final boolean hasTimezone;
-  private final TimeZone timezone;
-
-  public TextFieldSerializerDeserializer(TableMeta meta) {
-    hasTimezone = meta.containsOption(StorageConstants.TIMEZONE);
-    timezone = TimeZone.getTimeZone(meta.getOption(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE));
-  }
-
-  private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
-    return !val.isReadable() || nullBytes.equals(val);
-  }
-
-  private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
-    return val.readableBytes() > 0 && nullBytes.equals(val);
-  }
-
-  @Override
-  public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars)
-      throws IOException {
-    byte[] bytes;
-    int length = 0;
-    TajoDataTypes.DataType dataType = col.getDataType();
-
-    if (datum == null || datum instanceof NullDatum) {
-      switch (dataType.getType()) {
-        case CHAR:
-        case TEXT:
-          length = nullChars.length;
-          out.write(nullChars);
-          break;
-        default:
-          break;
-      }
-      return length;
-    }
-
-    switch (dataType.getType()) {
-      case BOOLEAN:
-        out.write(datum.asBool() ? trueBytes : falseBytes);
-        length = trueBytes.length;
-        break;
-      case CHAR:
-        byte[] pad = new byte[dataType.getLength() - datum.size()];
-        bytes = datum.asTextBytes();
-        out.write(bytes);
-        out.write(pad);
-        length = bytes.length + pad.length;
-        break;
-      case TEXT:
-      case BIT:
-      case INT2:
-      case INT4:
-      case INT8:
-      case FLOAT4:
-      case FLOAT8:
-      case INET4:
-      case DATE:
-      case INTERVAL:
-        bytes = datum.asTextBytes();
-        length = bytes.length;
-        out.write(bytes);
-        break;
-      case TIME:
-        if (hasTimezone) {
-          bytes = ((TimeDatum) datum).asChars(timezone, true).getBytes();
-        } else {
-          bytes = datum.asTextBytes();
-        }
-        length = bytes.length;
-        out.write(bytes);
-        break;
-      case TIMESTAMP:
-        if (hasTimezone) {
-          bytes = ((TimestampDatum) datum).asChars(timezone, true).getBytes();
-        } else {
-          bytes = datum.asTextBytes();
-        }
-        length = bytes.length;
-        out.write(bytes);
-        break;
-      case INET6:
-      case BLOB:
-        bytes = Base64.encodeBase64(datum.asByteArray(), false);
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case PROTOBUF:
-        ProtobufDatum protobuf = (ProtobufDatum) datum;
-        byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
-        length = protoBytes.length;
-        out.write(protoBytes, 0, protoBytes.length);
-        break;
-      case NULL_TYPE:
-      default:
-        break;
-    }
-    return length;
-  }
-
-  @Override
-  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException {
-    Datum datum;
-    TajoDataTypes.Type type = col.getDataType().getType();
-    boolean nullField;
-    if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
-      nullField = isNullText(buf, nullChars);
-    } else {
-      nullField = isNull(buf, nullChars);
-    }
-
-    if (nullField) {
-      datum = NullDatum.get();
-    } else {
-      switch (type) {
-        case BOOLEAN:
-          byte bool = buf.readByte();
-          datum = DatumFactory.createBool(bool == 't' || bool == 'T');
-          break;
-        case BIT:
-          datum = DatumFactory.createBit(Byte.parseByte(
-              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()));
-          break;
-        case CHAR:
-          datum = DatumFactory.createChar(
-              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim());
-          break;
-        case INT1:
-        case INT2:
-          datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
-          break;
-        case INT4:
-          datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
-          break;
-        case INT8:
-          datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
-          break;
-        case FLOAT4:
-          datum = DatumFactory.createFloat4(
-              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
-          break;
-        case FLOAT8:
-          datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
-          break;
-        case TEXT: {
-          byte[] bytes = new byte[buf.readableBytes()];
-          buf.readBytes(bytes);
-          datum = DatumFactory.createText(bytes);
-          break;
-        }
-        case DATE:
-          datum = DatumFactory.createDate(
-              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
-          break;
-        case TIME:
-          if (hasTimezone) {
-            datum = DatumFactory.createTime(
-                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone);
-          } else {
-            datum = DatumFactory.createTime(
-                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
-          }
-          break;
-        case TIMESTAMP:
-          if (hasTimezone) {
-            datum = DatumFactory.createTimestamp(
-                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone);
-          } else {
-            datum = DatumFactory.createTimestamp(
-                decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
-          }
-          break;
-        case INTERVAL:
-          datum = DatumFactory.createInterval(
-              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
-          break;
-        case PROTOBUF: {
-          ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
-          Message.Builder builder = factory.newBuilder();
-          try {
-            byte[] bytes = new byte[buf.readableBytes()];
-            buf.readBytes(bytes);
-            protobufJsonFormat.merge(bytes, builder);
-            datum = factory.createDatum(builder.build());
-          } catch (IOException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-          break;
-        }
-        case INET4:
-          datum = DatumFactory.createInet4(
-              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
-          break;
-        case BLOB: {
-          byte[] bytes = new byte[buf.readableBytes()];
-          buf.readBytes(bytes);
-          datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
-          break;
-        }
-        default:
-          datum = NullDatum.get();
-          break;
-      }
-    }
-    return datum;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
deleted file mode 100644
index 7ebfa79..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-/**
- * Reads a text line and fills a Tuple with values
- */
-public abstract class TextLineDeserializer {
-  protected Schema schema;
-  protected TableMeta meta;
-  protected int [] targetColumnIndexes;
-
-  public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
-    this.schema = schema;
-    this.meta = meta;
-    this.targetColumnIndexes = targetColumnIndexes;
-  }
-
-  /**
-   * Initialize SerDe
-   */
-  public abstract void init();
-
-  /**
-   * It fills a tuple with a read fields in a given line.
-   *
-   * @param buf Read line
-   * @param output Tuple to be filled with read fields
-   * @throws java.io.IOException
-   */
-  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError;
-
-  /**
-   * Release external resources
-   */
-  public abstract void release();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
deleted file mode 100644
index f0bae5e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-public class TextLineParsingError extends Exception {
-
-  public TextLineParsingError(Throwable t) {
-    super(t);
-  }
-
-  public TextLineParsingError(String message, Throwable t) {
-    super(t.getMessage() + ", Error line: " + message);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
deleted file mode 100644
index e81e289..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.StorageConstants;
-
-/**
- * Pluggable Text Line SerDe class
- */
-public abstract class TextLineSerDe {
-
-  public TextLineSerDe() {
-  }
-
-  public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes);
-
-  public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
-
-  public static ByteBuf getNullChars(TableMeta meta) {
-    byte[] nullCharByteArray = getNullCharsAsBytes(meta);
-
-    ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length);
-    nullChars.writeBytes(nullCharByteArray);
-
-    return nullChars;
-  }
-
-  public static byte [] getNullCharsAsBytes(TableMeta meta) {
-    byte [] nullChars;
-
-    String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
-        NullDatum.DEFAULT_TEXT));
-    if (StringUtils.isEmpty(nullCharacters)) {
-      nullChars = NullDatum.get().asTextBytes();
-    } else {
-      nullChars = nullCharacters.getBytes();
-    }
-
-    return nullChars;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
deleted file mode 100644
index 0c2761f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Write a Tuple into single text formatted line
- */
-public abstract class TextLineSerializer {
-  protected Schema schema;
-  protected TableMeta meta;
-
-  public TextLineSerializer(Schema schema, TableMeta meta) {
-    this.schema = schema;
-    this.meta = meta;
-  }
-
-  public abstract void init();
-
-  public abstract int serialize(OutputStream out, Tuple input) throws IOException;
-
-  public abstract void release();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
deleted file mode 100644
index 543336f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.BadConfigurationException;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-class CodecFactory {
-
-  public class BytesDecompressor {
-
-    private final CompressionCodec codec;
-    private final Decompressor decompressor;
-
-    public BytesDecompressor(CompressionCodec codec) {
-      this.codec = codec;
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-      } else {
-        decompressor = null;
-      }
-    }
-
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
-      final BytesInput decompressed;
-      if (codec != null) {
-        decompressor.reset();
-        InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
-        decompressed = BytesInput.from(is, uncompressedSize);
-      } else {
-        decompressed = bytes;
-      }
-      return decompressed;
-    }
-
-    private void release() {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-      }
-    }
-  }
-
-  /**
-   * Encapsulates the logic around hadoop compression
-   *
-   * @author Julien Le Dem
-   *
-   */
-  public static class BytesCompressor {
-
-    private final CompressionCodec codec;
-    private final Compressor compressor;
-    private final ByteArrayOutputStream compressedOutBuffer;
-    private final CompressionCodecName codecName;
-
-    public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
-      this.codecName = codecName;
-      this.codec = codec;
-      if (codec != null) {
-        this.compressor = CodecPool.getCompressor(codec);
-        this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
-      } else {
-        this.compressor = null;
-        this.compressedOutBuffer = null;
-      }
-    }
-
-    public BytesInput compress(BytesInput bytes) throws IOException {
-      final BytesInput compressedBytes;
-      if (codec == null) {
-        compressedBytes = bytes;
-      } else {
-        compressedOutBuffer.reset();
-        if (compressor != null) {
-          // null compressor for non-native gzip
-          compressor.reset();
-        }
-        CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
-        bytes.writeAllTo(cos);
-        cos.finish();
-        cos.close();
-        compressedBytes = BytesInput.from(compressedOutBuffer);
-      }
-      return compressedBytes;
-    }
-
-    private void release() {
-      if (compressor != null) {
-        CodecPool.returnCompressor(compressor);
-      }
-    }
-
-    public CompressionCodecName getCodecName() {
-      return codecName;
-    }
-
-  }
-
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
-  private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
-  private final Configuration configuration;
-
-  public CodecFactory(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  /**
-   *
-   * @param codecName the requested codec
-   * @return the corresponding hadoop codec. null if UNCOMPRESSED
-   */
-  private CompressionCodec getCodec(CompressionCodecName codecName) {
-    String codecClassName = codecName.getHadoopCompressionCodecClassName();
-    if (codecClassName == null) {
-      return null;
-    }
-    CompressionCodec codec = codecByName.get(codecClassName);
-    if (codec != null) {
-      return codec;
-    }
-
-    try {
-      Class<?> codecClass = Class.forName(codecClassName);
-      codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
-      codecByName.put(codecClassName, codec);
-      return codec;
-    } catch (ClassNotFoundException e) {
-      throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
-    }
-  }
-
-  public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
-    BytesCompressor comp = compressors.get(codecName);
-    if (comp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      comp = new BytesCompressor(codecName, codec, pageSize);
-      compressors.put(codecName, comp);
-    }
-    return comp;
-  }
-
-  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
-    BytesDecompressor decomp = decompressors.get(codecName);
-    if (decomp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      decomp = new BytesDecompressor(codec);
-      decompressors.put(codecName, decomp);
-    }
-    return decomp;
-  }
-
-  public void release() {
-    for (BytesCompressor compressor : compressors.values()) {
-      compressor.release();
-    }
-    compressors.clear();
-    for (BytesDecompressor decompressor : decompressors.values()) {
-      decompressor.release();
-    }
-    decompressors.clear();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
deleted file mode 100644
index 5f89ead..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.INFO;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.statistics.BooleanStatistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-
-class ColumnChunkPageWriteStore implements PageWriteStore {
-  private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
-
-  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-
-  private static final class ColumnChunkPageWriter implements PageWriter {
-
-    private final ColumnDescriptor path;
-    private final BytesCompressor compressor;
-
-    private final CapacityByteArrayOutputStream buf;
-    private DictionaryPage dictionaryPage;
-
-    private long uncompressedLength;
-    private long compressedLength;
-    private long totalValueCount;
-    private int pageCount;
-
-    private Set<Encoding> encodings = new HashSet<Encoding>();
-
-    private Statistics totalStatistics;
-
-    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) {
-      this.path = path;
-      this.compressor = compressor;
-      this.buf = new CapacityByteArrayOutputStream(initialSize);
-      this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
-    }
-
-    @Deprecated
-    @Override
-    public void writePage(BytesInput bytes,
-                          int valueCount,
-                          Encoding rlEncoding,
-                          Encoding dlEncoding,
-                          Encoding valuesEncoding) throws IOException {
-      long uncompressedSize = bytes.size();
-      BytesInput compressedBytes = compressor.compress(bytes);
-      long compressedSize = compressedBytes.size();
-      BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
-      parquetMetadataConverter.writeDataPageHeader(
-          (int)uncompressedSize,
-          (int)compressedSize,
-          valueCount,
-          statistics,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          buf);
-      this.uncompressedLength += uncompressedSize;
-      this.compressedLength += compressedSize;
-      this.totalValueCount += valueCount;
-      this.pageCount += 1;
-      compressedBytes.writeAllTo(buf);
-      encodings.add(rlEncoding);
-      encodings.add(dlEncoding);
-      encodings.add(valuesEncoding);
-    }
-
-    @Override
-    public void writePage(BytesInput bytes,
-                          int valueCount,
-                          Statistics statistics,
-                          Encoding rlEncoding,
-                          Encoding dlEncoding,
-                          Encoding valuesEncoding) throws IOException {
-      long uncompressedSize = bytes.size();
-      BytesInput compressedBytes = compressor.compress(bytes);
-      long compressedSize = compressedBytes.size();
-      parquetMetadataConverter.writeDataPageHeader(
-          (int)uncompressedSize,
-          (int)compressedSize,
-          valueCount,
-          statistics,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          buf);
-      this.uncompressedLength += uncompressedSize;
-      this.compressedLength += compressedSize;
-      this.totalValueCount += valueCount;
-      this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
-      compressedBytes.writeAllTo(buf);
-      encodings.add(rlEncoding);
-      encodings.add(dlEncoding);
-      encodings.add(valuesEncoding);
-    }
-
-    @Override
-    public long getMemSize() {
-      return buf.size();
-    }
-
-    public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
-      writer.startColumn(path, totalValueCount, compressor.getCodecName());
-      if (dictionaryPage != null) {
-        writer.writeDictionaryPage(dictionaryPage);
-        encodings.add(dictionaryPage.getEncoding());
-      }
-      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
-      writer.endColumn();
-      if (INFO) {
-        LOG.info(
-            String.format(
-                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
-                buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
-                + (dictionaryPage != null ? String.format(
-                ", dic { %,d entries, %,dB raw, %,dB comp}",
-                dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
-                : ""));
-      }
-      encodings.clear();
-      pageCount = 0;
-    }
-
-    @Override
-    public long allocatedSize() {
-      return buf.getCapacity();
-    }
-
-    @Override
-    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
-      if (this.dictionaryPage != null) {
-        throw new ParquetEncodingException("Only one dictionary page is allowed");
-      }
-      BytesInput dictionaryBytes = dictionaryPage.getBytes();
-      int uncompressedSize = (int)dictionaryBytes.size();
-      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
-      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
-    }
-
-    @Override
-    public String memUsageString(String prefix) {
-      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
-    }
-  }
-
-  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
-  private final MessageType schema;
-  private final BytesCompressor compressor;
-  private final int initialSize;
-
-  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
-    this.compressor = compressor;
-    this.schema = schema;
-    this.initialSize = initialSize;
-  }
-
-  @Override
-  public PageWriter getPageWriter(ColumnDescriptor path) {
-    if (!writers.containsKey(path)) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSize));
-    }
-    return writers.get(path);
-  }
-
-  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
-    List<ColumnDescriptor> columns = schema.getColumns();
-    for (ColumnDescriptor columnDescriptor : columns) {
-      ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
-      pageWriter.writeToFileWriter(writer);
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
deleted file mode 100644
index 61567e5..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.PageReadStore;
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-
-class InternalParquetRecordReader<T> {
-  private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
-
-  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
-
-  private MessageType requestedSchema;
-  private MessageType fileSchema;
-  private int columnCount;
-  private final ReadSupport<T> readSupport;
-
-  private RecordMaterializer<T> recordConverter;
-
-  private T currentValue;
-  private long total;
-  private int current = 0;
-  private int currentBlock = -1;
-  private ParquetFileReader reader;
-  private parquet.io.RecordReader<T> recordReader;
-  private UnboundRecordFilter recordFilter;
-
-  private long totalTimeSpentReadingBytes;
-  private long totalTimeSpentProcessingRecords;
-  private long startedAssemblingCurrentBlockAt;
-
-  private long totalCountLoadedSoFar = 0;
-
-  private Path file;
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   */
-  public InternalParquetRecordReader(ReadSupport<T> readSupport) {
-    this(readSupport, null);
-  }
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   * @param filter Optional filter for only returning matching records.
-   */
-  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
-      filter) {
-    this.readSupport = readSupport;
-    this.recordFilter = filter;
-  }
-
-  private void checkRead() throws IOException {
-    if (current == totalCountLoadedSoFar) {
-      if (current != 0) {
-        long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
-        totalTimeSpentProcessingRecords += timeAssembling;
-        LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
-        long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
-        long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
-        long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
-        LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
-      }
-
-      LOG.info("at row " + current + ". reading next block");
-      long t0 = System.currentTimeMillis();
-      PageReadStore pages = reader.readNextRowGroup();
-      if (pages == null) {
-        throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
-      }
-      long timeSpentReading = System.currentTimeMillis() - t0;
-      totalTimeSpentReadingBytes += timeSpentReading;
-      BenchmarkCounter.incrementTime(timeSpentReading);
-      LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
-      if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
-      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
-      recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
-      startedAssemblingCurrentBlockAt = System.currentTimeMillis();
-      totalCountLoadedSoFar += pages.getRowCount();
-      ++ currentBlock;
-    }
-  }
-
-  public void close() throws IOException {
-    reader.close();
-  }
-
-  public Void getCurrentKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-  public T getCurrentValue() throws IOException,
-      InterruptedException {
-    return currentValue;
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    return (float) current / total;
-  }
-
-  public void initialize(MessageType requestedSchema, MessageType fileSchema,
-                         Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
-                         Path file, List<BlockMetaData> blocks, Configuration configuration)
-      throws IOException {
-    this.requestedSchema = requestedSchema;
-    this.fileSchema = fileSchema;
-    this.file = file;
-    this.columnCount = this.requestedSchema.getPaths().size();
-    this.recordConverter = readSupport.prepareForRead(
-        configuration, extraMetadata, fileSchema,
-        new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
-
-    List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    reader = new ParquetFileReader(configuration, file, blocks, columns);
-    for (BlockMetaData block : blocks) {
-      total += block.getRowCount();
-    }
-    LOG.info("RecordReader initialized will read a total of " + total + " records.");
-  }
-
-  private boolean contains(GroupType group, String[] path, int index) {
-    if (index == path.length) {
-      return false;
-    }
-    if (group.containsField(path[index])) {
-      Type type = group.getType(path[index]);
-      if (type.isPrimitive()) {
-        return index + 1 == path.length;
-      } else {
-        return contains(type.asGroupType(), path, index + 1);
-      }
-    }
-    return false;
-  }
-
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (current < total) {
-      try {
-        checkRead();
-        currentValue = recordReader.read();
-        if (DEBUG) LOG.debug("read value: " + currentValue);
-        current ++;
-      } catch (RuntimeException e) {
-        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
-      }
-      return true;
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
deleted file mode 100644
index 7410d2b..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.lang.String.format;
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.util.Map;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.schema.MessageType;
-
-class InternalParquetRecordWriter<T> {
-  private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
-
-  private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
-  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
-  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
-  private final ParquetFileWriter w;
-  private final WriteSupport<T> writeSupport;
-  private final MessageType schema;
-  private final Map<String, String> extraMetaData;
-  private final int blockSize;
-  private final int pageSize;
-  private final BytesCompressor compressor;
-  private final int dictionaryPageSize;
-  private final boolean enableDictionary;
-  private final boolean validating;
-  private final WriterVersion writerVersion;
-
-  private long recordCount = 0;
-  private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
-
-  private ColumnWriteStoreImpl store;
-  private ColumnChunkPageWriteStore pageStore;
-
-  /**
-   * @param w the file to write to
-   * @param writeSupport the class to convert incoming records
-   * @param schema the schema of the records
-   * @param extraMetaData extra meta data to write in the footer of the file
-   * @param blockSize the size of a block in the file (this will be approximate)
-   * @param codec the codec used to compress
-   */
-  public InternalParquetRecordWriter(
-      ParquetFileWriter w,
-      WriteSupport<T> writeSupport,
-      MessageType schema,
-      Map<String, String> extraMetaData,
-      int blockSize,
-      int pageSize,
-      BytesCompressor compressor,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      WriterVersion writerVersion) {
-    this.w = w;
-    this.writeSupport = checkNotNull(writeSupport, "writeSupport");
-    this.schema = schema;
-    this.extraMetaData = extraMetaData;
-    this.blockSize = blockSize;
-    this.pageSize = pageSize;
-    this.compressor = compressor;
-    this.dictionaryPageSize = dictionaryPageSize;
-    this.enableDictionary = enableDictionary;
-    this.validating = validating;
-    this.writerVersion = writerVersion;
-    initStore();
-  }
-
-  private void initStore() {
-    // we don't want this number to be too small
-    // ideally we divide the block equally across the columns
-    // it is unlikely all columns are going to be the same size.
-    int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5);
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
-    // we don't want this number to be too small either
-    // ideally, slightly bigger than the page size, but not bigger than the block buffer
-    int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
-    store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
-    MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
-    writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
-  }
-
-  public void close() throws IOException, InterruptedException {
-    flushStore();
-    w.end(extraMetaData);
-  }
-
-  public void write(T value) throws IOException, InterruptedException {
-    writeSupport.write(value);
-    ++ recordCount;
-    checkBlockSizeReached();
-  }
-
-  private void checkBlockSizeReached() throws IOException {
-    if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
-      long memSize = store.memSize();
-      if (memSize > blockSize) {
-        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
-        flushStore();
-        initStore();
-        recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
-      } else {
-        float recordSize = (float) memSize / recordCount;
-        recordCountForNextMemCheck = min(
-            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway
-            recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
-        );
-        if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
-      }
-    }
-  }
-
-  public long getEstimatedWrittenSize() throws IOException {
-    return w.getPos() + store.memSize();
-  }
-
-  private void flushStore()
-      throws IOException {
-    LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
-    if (store.allocatedSize() > 3 * blockSize) {
-      LOG.warn("Too much memory used: " + store.memUsageString());
-    }
-    w.startBlock(recordCount);
-    store.flush();
-    pageStore.flushToFileWriter(w);
-    recordCount = 0;
-    w.endBlock();
-    store = null;
-    pageStore = null;
-  }
-}
\ No newline at end of file