You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:38 UTC

[23/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
deleted file mode 100644
index f093f9d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index.bst;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.index.IndexMethod;
-import org.apache.tajo.storage.index.IndexWriter;
-import org.apache.tajo.storage.index.OrderIndexReader;
-
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.TreeMap;
-
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-/**
- * This is two-level binary search tree index. This is one of the value-list 
- * index structure. Thus, it is inefficient in the case where 
- * the many of the values are same. Also, the BST shows the fast performance 
- * when the selectivity of rows to be retrieved is less than 5%.
- * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe.
- */
-public class BSTIndex implements IndexMethod {
-  private static final Log LOG = LogFactory.getLog(BSTIndex.class);
-
-  public static final int ONE_LEVEL_INDEX = 1;
-  public static final int TWO_LEVEL_INDEX = 2;
-
-  private final Configuration conf;
-
-  public BSTIndex(final Configuration conf) {
-    this.conf = conf;
-  }
-  
-  @Override
-  public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
-      TupleComparator comparator) throws IOException {
-    return new BSTIndexWriter(fileName, level, keySchema, comparator);
-  }
-
-  @Override
-  public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
-    return new BSTIndexReader(fileName, keySchema, comparator);
-  }
-
-  public BSTIndexReader getIndexReader(Path fileName) throws IOException {
-    return new BSTIndexReader(fileName);
-  }
-
-  public class BSTIndexWriter extends IndexWriter implements Closeable {
-    private FSDataOutputStream out;
-    private FileSystem fs;
-    private int level;
-    private int loadNum = 4096;
-    private Path fileName;
-
-    private final Schema keySchema;
-    private final TupleComparator compartor;
-    private final KeyOffsetCollector collector;
-    private KeyOffsetCollector rootCollector;
-
-    private Tuple firstKey;
-    private Tuple lastKey;
-
-    private RowStoreEncoder rowStoreEncoder;
-
-    // private Tuple lastestKey = null;
-
-    /**
-     * constructor
-     *
-     * @param level
-     *          : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX
-     * @throws IOException
-     */
-    public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
-        TupleComparator comparator) throws IOException {
-      this.fileName = fileName;
-      this.level = level;
-      this.keySchema = keySchema;
-      this.compartor = comparator;
-      this.collector = new KeyOffsetCollector(comparator);
-      this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema);
-    }
-
-   public void setLoadNum(int loadNum) {
-      this.loadNum = loadNum;
-    }
-
-    public void open() throws IOException {
-      fs = fileName.getFileSystem(conf);
-      if (fs.exists(fileName)) {
-        throw new IOException("ERROR: index file (" + fileName + " already exists");
-      }
-      out = fs.create(fileName);
-    }
-
-    @Override
-    public void write(Tuple key, long offset) throws IOException {
-      if (firstKey == null || compartor.compare(key, firstKey) < 0) {
-        firstKey = key;
-      }
-      if (lastKey == null || compartor.compare(lastKey, key) < 0) {
-        lastKey = key;
-      }
-
-      collector.put(key, offset);
-    }
-
-    public TupleComparator getComparator() {
-      return this.compartor;
-    }
-
-    public void flush() throws IOException {
-      out.flush();
-    }
-
-    public void writeHeader(int entryNum) throws IOException {
-      // schema
-      byte [] schemaBytes = keySchema.getProto().toByteArray();
-      out.writeInt(schemaBytes.length);
-      out.write(schemaBytes);
-
-      // comparator
-      byte [] comparatorBytes = compartor.getProto().toByteArray();
-      out.writeInt(comparatorBytes.length);
-      out.write(comparatorBytes);
-
-      // level
-      out.writeInt(this.level);
-      // entry
-      out.writeInt(entryNum);
-      if (entryNum > 0) {
-        byte [] minBytes = rowStoreEncoder.toBytes(firstKey);
-        out.writeInt(minBytes.length);
-        out.write(minBytes);
-        byte [] maxBytes = rowStoreEncoder.toBytes(lastKey);
-        out.writeInt(maxBytes.length);
-        out.write(maxBytes);
-      }
-      out.flush();
-    }
-
-    public void close() throws IOException {
-      /* two level initialize */
-      if (this.level == TWO_LEVEL_INDEX) {
-        rootCollector = new KeyOffsetCollector(this.compartor);
-      }
-
-      /* data writing phase */
-      TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
-      Set<Tuple> keySet = keyOffsetMap.keySet();
-
-      int entryNum = keySet.size();
-      writeHeader(entryNum);
-      
-      int loadCount = this.loadNum - 1;
-      for (Tuple key : keySet) {
-
-        if (this.level == TWO_LEVEL_INDEX) {
-          loadCount++;
-          if (loadCount == this.loadNum) {
-            rootCollector.put(key, out.getPos());
-            loadCount = 0;
-          }
-        }
-        /* key writing */
-        byte[] buf = rowStoreEncoder.toBytes(key);
-        out.writeInt(buf.length);
-        out.write(buf);
-        
-        /**/
-        LinkedList<Long> offsetList = keyOffsetMap.get(key);
-        /* offset num writing */
-        int offsetSize = offsetList.size();
-        out.writeInt(offsetSize);
-        /* offset writing */
-        for (Long offset : offsetList) {
-          out.writeLong(offset);
-        }
-      }
-
-      out.flush();
-      out.close();
-      keySet.clear();
-      collector.clear();
-
-      FSDataOutputStream rootOut = null;
-      /* root index creating phase */
-      if (this.level == TWO_LEVEL_INDEX) {
-        TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
-        keySet = rootMap.keySet();
-
-        rootOut = fs.create(new Path(fileName + ".root"));
-        rootOut.writeInt(this.loadNum);
-        rootOut.writeInt(keySet.size());
-
-        /* root key writing */
-        for (Tuple key : keySet) {
-          byte[] buf = rowStoreEncoder.toBytes(key);
-          rootOut.writeInt(buf.length);
-          rootOut.write(buf);
-
-          LinkedList<Long> offsetList = rootMap.get(key);
-          if (offsetList.size() > 1 || offsetList.size() == 0) {
-            throw new IOException("Why root index doen't have one offset?");
-          }
-          rootOut.writeLong(offsetList.getFirst());
-
-        }
-        rootOut.flush();
-        rootOut.close();
-
-        keySet.clear();
-        rootCollector.clear();
-      }
-    }
-
-    private class KeyOffsetCollector {
-      private TreeMap<Tuple, LinkedList<Long>> map;
-
-      public KeyOffsetCollector(TupleComparator comparator) {
-        map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
-      }
-
-      public void put(Tuple key, long offset) {
-        if (map.containsKey(key)) {
-          map.get(key).add(offset);
-        } else {
-          LinkedList<Long> list = new LinkedList<Long>();
-          list.add(offset);
-          map.put(key, list);
-        }
-      }
-
-      public TreeMap<Tuple, LinkedList<Long>> getMap() {
-        return this.map;
-      }
-
-      public void clear() {
-        this.map.clear();
-      }
-    }
-  }
-
-  /**
-   * BSTIndexReader is thread-safe.
-   */
-  public class BSTIndexReader implements OrderIndexReader , Closeable{
-    private Path fileName;
-    private Schema keySchema;
-    private TupleComparator comparator;
-
-    private FileSystem fs;
-    private FSDataInputStream indexIn;
-    private FSDataInputStream subIn;
-
-    private int level;
-    private int entryNum;
-    private int loadNum = -1;
-    private Tuple firstKey;
-    private Tuple lastKey;
-
-    // the cursors of BST
-    private int rootCursor;
-    private int keyCursor;
-    private int offsetCursor;
-
-    // mutex
-    private final Object mutex = new Object();
-
-    private RowStoreDecoder rowStoreDecoder;
-
-    /**
-     *
-     * @param fileName
-     * @param keySchema
-     * @param comparator
-     * @throws IOException
-     */
-    public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
-      this.fileName = fileName;
-      this.keySchema = keySchema;
-      this.comparator = comparator;
-      this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
-    }
-
-    public BSTIndexReader(final Path fileName) throws IOException {
-      this.fileName = fileName;
-    }
-
-    public Schema getKeySchema() {
-      return this.keySchema;
-    }
-
-    public TupleComparator getComparator() {
-      return this.comparator;
-    }
-
-    private void readHeader() throws IOException {
-      // schema
-      int schemaByteSize = indexIn.readInt();
-      byte [] schemaBytes = new byte[schemaByteSize];
-      StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize);
-
-      SchemaProto.Builder builder = SchemaProto.newBuilder();
-      builder.mergeFrom(schemaBytes);
-      SchemaProto proto = builder.build();
-      this.keySchema = new Schema(proto);
-      this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
-
-      // comparator
-      int compByteSize = indexIn.readInt();
-      byte [] compBytes = new byte[compByteSize];
-      StorageUtil.readFully(indexIn, compBytes, 0, compByteSize);
-
-      TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
-      compProto.mergeFrom(compBytes);
-      this.comparator = new BaseTupleComparator(compProto.build());
-
-      // level
-      this.level = indexIn.readInt();
-      // entry
-      this.entryNum = indexIn.readInt();
-      if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
-        byte [] minBytes = new byte[indexIn.readInt()];
-        StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length);
-        this.firstKey = rowStoreDecoder.toTuple(minBytes);
-
-        byte [] maxBytes = new byte[indexIn.readInt()];
-        StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length);
-        this.lastKey = rowStoreDecoder.toTuple(maxBytes);
-      }
-    }
-
-    public void open()
-        throws IOException {
-      /* init the index file */
-      fs = fileName.getFileSystem(conf);
-      if (!fs.exists(fileName)) {
-        throw new FileNotFoundException("ERROR: does not exist " + fileName.toString());
-      }
-
-      indexIn = fs.open(this.fileName);
-      readHeader();
-      fillData();
-    }
-
-    private void fillData() throws IOException {
-      /* load on memory */
-      if (this.level == TWO_LEVEL_INDEX) {
-
-        Path rootPath = new Path(this.fileName + ".root");
-        if (!fs.exists(rootPath)) {
-          throw new FileNotFoundException("root index did not created");
-        }
-
-        subIn = indexIn;
-        indexIn = fs.open(rootPath);
-        /* root index header reading : type => loadNum => indexSize */
-        this.loadNum = indexIn.readInt();
-        this.entryNum = indexIn.readInt();
-        /**/
-        fillRootIndex(entryNum, indexIn);
-
-      } else {
-        fillLeafIndex(entryNum, indexIn, -1);
-      }
-    }
-
-    /**
-     *
-     * @return
-     * @throws IOException
-     */
-    public long find(Tuple key) throws IOException {
-      return find(key, false);
-    }
-
-    @Override
-    public long find(Tuple key, boolean nextKey) throws IOException {
-      synchronized (mutex) {
-        int pos = -1;
-        if (this.level == ONE_LEVEL_INDEX) {
-            pos = oneLevBS(key);
-        } else if (this.level == TWO_LEVEL_INDEX) {
-            pos = twoLevBS(key, this.loadNum + 1);
-        } else {
-          throw new IOException("More than TWL_LEVEL_INDEX is not supported.");
-        }
-
-        if (nextKey) {
-          if (pos + 1 >= this.offsetSubIndex.length) {
-            return -1;
-          }
-          keyCursor = pos + 1;
-          offsetCursor = 0;
-        } else {
-          if (correctable) {
-            keyCursor = pos;
-            offsetCursor = 0;
-          } else {
-            return -1;
-          }
-        }
-
-        return this.offsetSubIndex[keyCursor][offsetCursor];
-      }
-    }
-
-    public long next() throws IOException {
-      synchronized (mutex) {
-        if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) {
-          offsetCursor++;
-        } else {
-          if (offsetSubIndex.length - 1 > keyCursor) {
-            keyCursor++;
-            offsetCursor = 0;
-          } else {
-            if (offsetIndex.length -1 > rootCursor) {
-              rootCursor++;
-              fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
-              keyCursor = 1;
-              offsetCursor = 0;
-            } else {
-              return -1;
-            }
-          }
-        }
-
-        return this.offsetSubIndex[keyCursor][offsetCursor];
-      }
-    }
-    
-    public boolean isCurInMemory() {
-      return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor);
-    }
-
-    private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos)
-        throws IOException {
-      int counter = 0;
-      try {
-        if (pos != -1) {
-          in.seek(pos);
-        }
-        this.dataSubIndex = new Tuple[entryNum];
-        this.offsetSubIndex = new long[entryNum][];
-
-        byte[] buf;
-        for (int i = 0; i < entryNum; i++) {
-          counter++;
-          buf = new byte[in.readInt()];
-          StorageUtil.readFully(in, buf, 0, buf.length);
-          dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
-
-          int offsetNum = in.readInt();
-          this.offsetSubIndex[i] = new long[offsetNum];
-          for (int j = 0; j < offsetNum; j++) {
-            this.offsetSubIndex[i][j] = in.readLong();
-          }
-
-        }
-
-      } catch (IOException e) {
-        counter--;
-        if (pos != -1) {
-          in.seek(pos);
-        }
-        this.dataSubIndex = new Tuple[counter];
-        this.offsetSubIndex = new long[counter][];
-
-        byte[] buf;
-        for (int i = 0; i < counter; i++) {
-          buf = new byte[in.readInt()];
-          StorageUtil.readFully(in, buf, 0, buf.length);
-          dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
-
-          int offsetNum = in.readInt();
-          this.offsetSubIndex[i] = new long[offsetNum];
-          for (int j = 0; j < offsetNum; j++) {
-            this.offsetSubIndex[i][j] = in.readLong();
-          }
-
-        }
-      }
-    }
-
-    public Tuple getFirstKey() {
-      return this.firstKey;
-    }
-
-    public Tuple getLastKey() {
-      return this.lastKey;
-    }
-
-    private void fillRootIndex(int entryNum, FSDataInputStream in)
-        throws IOException {
-      this.dataIndex = new Tuple[entryNum];
-      this.offsetIndex = new long[entryNum];
-      Tuple keyTuple;
-      byte[] buf;
-      for (int i = 0; i < entryNum; i++) {
-        buf = new byte[in.readInt()];
-        StorageUtil.readFully(in, buf, 0, buf.length);
-        keyTuple = rowStoreDecoder.toTuple(buf);
-        dataIndex[i] = keyTuple;
-        this.offsetIndex[i] = in.readLong();
-      }
-    }
-
-    /* memory index, only one is used. */
-    private Tuple[] dataIndex = null;
-    private Tuple[] dataSubIndex = null;
-
-    /* offset index */
-    private long[] offsetIndex = null;
-    private long[][] offsetSubIndex = null;
-
-    private boolean correctable = true;
-
-    private int oneLevBS(Tuple key) throws IOException {
-      correctable = true;
-      int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
-      return pos;
-    }
-
-    private int twoLevBS(Tuple key, int loadNum) throws IOException {
-      int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length);
-      if(pos > 0) {
-        rootCursor = pos;
-      } else {
-        rootCursor = 0;
-      }
-      fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
-      pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
-       
-      return pos;
-    }
-
-    private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) {
-      int offset = -1;
-      int start = startPos;
-      int end = endPos;
-
-      //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
-      int centerPos = (start + end) >>> 1;
-      while (true) {
-        if (comparator.compare(arr[centerPos], key) > 0) {
-          if (centerPos == 0) {
-            correctable = false;
-            break;
-          } else if (comparator.compare(arr[centerPos - 1], key) < 0) {
-            correctable = false;
-            offset = centerPos - 1;
-            break;
-          } else {
-            end = centerPos;
-            centerPos = (start + end) / 2;
-          }
-        } else if (comparator.compare(arr[centerPos], key) < 0) {
-          if (centerPos == arr.length - 1) {
-            correctable = false;
-            offset = centerPos;
-            break;
-          } else if (comparator.compare(arr[centerPos + 1], key) > 0) {
-            correctable = false;
-            offset = centerPos;
-            break;
-          } else {
-            start = centerPos + 1;
-            centerPos = (start + end) / 2;
-          }
-        } else {
-          correctable = true;
-          offset = centerPos;
-          break;
-        }
-      }
-      return offset;
-    }
-
-    @Override
-    public void close() throws IOException {
-      this.indexIn.close();
-      this.subIn.close();
-    }
-
-    @Override
-    public String toString() {
-      return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
deleted file mode 100644
index b10d423..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.storage.StorageConstants;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.FileAppender;
-import org.apache.tajo.storage.TableStatistics;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-/**
- * FileAppender for writing to Parquet files.
- */
-public class ParquetAppender extends FileAppender {
-  private TajoParquetWriter writer;
-  private int blockSize;
-  private int pageSize;
-  private CompressionCodecName compressionCodecName;
-  private boolean enableDictionary;
-  private boolean validating;
-  private TableStatistics stats;
-
-  /**
-   * Creates a new ParquetAppender.
-   *
-   * @param conf Configuration properties.
-   * @param schema The table schema.
-   * @param meta The table metadata.
-   * @param workDir The path of the Parquet file to write to.
-   */
-  public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta,
-                         Path workDir) throws IOException {
-    super(conf, taskAttemptId, schema, meta, workDir);
-    this.blockSize = Integer.parseInt(
-        meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE));
-    this.pageSize = Integer.parseInt(
-        meta.getOption(ParquetOutputFormat.PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE));
-    this.compressionCodecName = CompressionCodecName.fromConf(
-        meta.getOption(ParquetOutputFormat.COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME));
-    this.enableDictionary = Boolean.parseBoolean(
-        meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED));
-    this.validating = Boolean.parseBoolean(
-        meta.getOption(ParquetOutputFormat.VALIDATION, StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED));
-  }
-
-  /**
-   * Initializes the Appender. This method creates a new TajoParquetWriter
-   * and initializes the table statistics if enabled.
-   */
-  public void init() throws IOException {
-    writer = new TajoParquetWriter(path,
-                                   schema,
-                                   compressionCodecName,
-                                   blockSize,
-                                   pageSize,
-                                   enableDictionary,
-                                   validating);
-    if (enabledStats) {
-      this.stats = new TableStatistics(schema);
-    }
-    super.init();
-  }
-
-  /**
-   * Gets the current offset. Tracking offsets is currenly not implemented, so
-   * this method always returns 0.
-   *
-   * @return 0
-   */
-  @Override
-  public long getOffset() throws IOException {
-    return 0;
-  }
-
-  /**
-   * Write a Tuple to the Parquet file.
-   *
-   * @param tuple The Tuple to write.
-   */
-  @Override
-  public void addTuple(Tuple tuple) throws IOException {
-    if (enabledStats) {
-      for (int i = 0; i < schema.size(); ++i) {
-        stats.analyzeField(i, tuple.get(i));
-      }
-    }
-    writer.write(tuple);
-    if (enabledStats) {
-      stats.incrementRow();
-    }
-  }
-
-  /**
-   * The ParquetWriter does not need to be flushed, so this is a no-op.
-   */
-  @Override
-  public void flush() throws IOException {
-  }
-
-  /**
-   * Closes the Appender.
-   */
-  @Override
-  public void close() throws IOException {
-    writer.close();
-  }
-
-  public long getEstimatedOutputSize() throws IOException {
-    return writer.getEstimatedWrittenSize();
-  }
-
-  /**
-   * If table statistics is enabled, retrieve the table statistics.
-   *
-   * @return Table statistics if enabled or null otherwise.
-   */
-  @Override
-  public TableStats getStats() {
-    if (enabledStats) {
-      return stats.getTableStat();
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
deleted file mode 100644
index 2f8efcf..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-
-/**
- * FileScanner for reading Parquet files
- */
-public class ParquetScanner extends FileScanner {
-  private TajoParquetReader reader;
-
-  /**
-   * Creates a new ParquetScanner.
-   *
-   * @param conf
-   * @param schema
-   * @param meta
-   * @param fragment
-   */
-  public ParquetScanner(Configuration conf, final Schema schema,
-                        final TableMeta meta, final Fragment fragment) {
-    super(conf, schema, meta, fragment);
-  }
-
-  /**
-   * Initializes the ParquetScanner. This method initializes the
-   * TajoParquetReader.
-   */
-  @Override
-  public void init() throws IOException {
-    if (targets == null) {
-      targets = schema.toArray();
-    }
-    reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets));
-    super.init();
-  }
-
-  /**
-   * Reads the next Tuple from the Parquet file.
-   *
-   * @return The next Tuple from the Parquet file or null if end of file is
-   *         reached.
-   */
-  @Override
-  public Tuple next() throws IOException {
-    return reader.read();
-  }
-
-  /**
-   * Resets the scanner
-   */
-  @Override
-  public void reset() throws IOException {
-  }
-
-  /**
-   * Closes the scanner.
-   */
-  @Override
-  public void close() throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-  }
-
-  /**
-   * Returns whether this scanner is projectable.
-   *
-   * @return true
-   */
-  @Override
-  public boolean isProjectable() {
-    return true;
-  }
-
-  /**
-   * Returns whether this scanner is selectable.
-   *
-   * @return false
-   */
-  @Override
-  public boolean isSelectable() {
-    return false;
-  }
-
-  /**
-   * Returns whether this scanner is splittable.
-   *
-   * @return false
-   */
-  @Override
-  public boolean isSplittable() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
deleted file mode 100644
index a765f48..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetReader;
-import parquet.filter.UnboundRecordFilter;
-
-import java.io.IOException;
-
-/**
- * Tajo implementation of {@link ParquetReader} to read Tajo records from a
- * Parquet file. Users should use {@link ParquetScanner} and not this class
- * directly.
- */
-public class TajoParquetReader extends ParquetReader<Tuple> {
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   */
-  public TajoParquetReader(Path file, Schema readSchema) throws IOException {
-    super(file, new TajoReadSupport(readSchema));
-  }
-
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   * @param requestedSchema Tajo schema of the projection.
-   */
-  public TajoParquetReader(Path file, Schema readSchema,
-                           Schema requestedSchema) throws IOException {
-    super(file, new TajoReadSupport(readSchema, requestedSchema));
-  }
-
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   * @param recordFilter Record filter.
-   */
-  public TajoParquetReader(Path file, Schema readSchema,
-                           UnboundRecordFilter recordFilter)
-      throws IOException {
-    super(file, new TajoReadSupport(readSchema), recordFilter);
-  }
-
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   * @param requestedSchema Tajo schema of the projection.
-   * @param recordFilter Record filter.
-   */
-  public TajoParquetReader(Path file, Schema readSchema,
-                           Schema requestedSchema,
-                           UnboundRecordFilter recordFilter)
-      throws IOException {
-    super(file, new TajoReadSupport(readSchema, requestedSchema),
-          recordFilter);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
deleted file mode 100644
index 69b76c4..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import java.io.IOException;
-
-/**
- * Tajo implementation of {@link ParquetWriter} to write Tajo records to a
- * Parquet file. Users should use {@link ParquetAppender} and not this class
- * directly.
- */
-public class TajoParquetWriter extends ParquetWriter<Tuple> {
-  /**
-   * Create a new TajoParquetWriter
-   *
-   * @param file The file name to write to.
-   * @param schema The Tajo schema of the table.
-   * @param compressionCodecName Compression codec to use, or
-   *                             CompressionCodecName.UNCOMPRESSED.
-   * @param blockSize The block size threshold.
-   * @param pageSize See parquet write up. Blocks are subdivided into pages
-   *                 for alignment.
-   * @throws IOException
-   */
-  public TajoParquetWriter(Path file,
-                           Schema schema,
-                           CompressionCodecName compressionCodecName,
-                           int blockSize,
-                           int pageSize) throws IOException {
-    super(file,
-          new TajoWriteSupport(schema),
-          compressionCodecName,
-          blockSize,
-          pageSize);
-  }
-
-  /**
-   * Create a new TajoParquetWriter.
-   *
-   * @param file The file name to write to.
-   * @param schema The Tajo schema of the table.
-   * @param compressionCodecName Compression codec to use, or
-   *                             CompressionCodecName.UNCOMPRESSED.
-   * @param blockSize The block size threshold.
-   * @param pageSize See parquet write up. Blocks are subdivided into pages
-   *                 for alignment.
-   * @param enableDictionary Whether to use a dictionary to compress columns.
-   * @param validating Whether to turn on validation.
-   * @throws IOException
-   */
-  public TajoParquetWriter(Path file,
-                           Schema schema,
-                           CompressionCodecName compressionCodecName,
-                           int blockSize,
-                           int pageSize,
-                           boolean enableDictionary,
-                           boolean validating) throws IOException {
-    super(file,
-          new TajoWriteSupport(schema),
-          compressionCodecName,
-          blockSize,
-          pageSize,
-          enableDictionary,
-          validating);
-  }
-
-  /**
-   * Creates a new TajoParquetWriter. The default block size is 128 MB.
-   * The default page size is 1 MB. Default compression is no compression.
-   *
-   * @param file The Path of the file to write to.
-   * @param schema The Tajo schema of the table.
-   * @throws IOException
-   */
-  public TajoParquetWriter(Path file, Schema schema) throws IOException {
-    this(file,
-         schema,
-         CompressionCodecName.UNCOMPRESSED,
-         DEFAULT_BLOCK_SIZE,
-         DEFAULT_PAGE_SIZE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
deleted file mode 100644
index 269f782..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import java.util.Map;
-
-import parquet.Log;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.storage.Tuple;
-
-/**
- * Tajo implementation of {@link ReadSupport} for {@link Tuple}s.
- * Users should use {@link ParquetScanner} and not this class directly.
- */
-public class TajoReadSupport extends ReadSupport<Tuple> {
-  private static final Log LOG = Log.getLog(TajoReadSupport.class);
-
-  private Schema readSchema;
-  private Schema requestedSchema;
-
-  /**
-   * Creates a new TajoReadSupport.
-   *
-   * @param requestedSchema The Tajo schema of the requested projection passed
-   *        down by ParquetScanner.
-   */
-  public TajoReadSupport(Schema readSchema, Schema requestedSchema) {
-    super();
-    this.readSchema = readSchema;
-    this.requestedSchema = requestedSchema;
-  }
-
-  /**
-   * Creates a new TajoReadSupport.
-   *
-   * @param readSchema The schema of the table.
-   */
-  public TajoReadSupport(Schema readSchema) {
-    super();
-    this.readSchema = readSchema;
-    this.requestedSchema = readSchema;
-  }
-
-  /**
-   * Initializes the ReadSupport.
-   *
-   * @param context The InitContext.
-   * @return A ReadContext that defines how to read the file.
-   */
-  @Override
-  public ReadContext init(InitContext context) {
-    if (requestedSchema == null) {
-      throw new RuntimeException("requestedSchema is null.");
-    }
-    MessageType requestedParquetSchema =
-      new TajoSchemaConverter().convert(requestedSchema);
-    LOG.debug("Reading data with projection:\n" + requestedParquetSchema);
-    return new ReadContext(requestedParquetSchema);
-  }
-
-  /**
-   * Prepares for read.
-   *
-   * @param configuration The job configuration.
-   * @param keyValueMetaData App-specific metadata from the file.
-   * @param fileSchema The schema of the Parquet file.
-   * @param readContext Returned by the init method.
-   */
-  @Override
-  public RecordMaterializer<Tuple> prepareForRead(
-      Configuration configuration,
-      Map<String, String> keyValueMetaData,
-      MessageType fileSchema,
-      ReadContext readContext) {
-    MessageType parquetRequestedSchema = readContext.getRequestedSchema();
-    return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
deleted file mode 100644
index 7c3d79d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import com.google.protobuf.Message;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import java.nio.ByteBuffer;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.Converter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.io.api.Binary;
-import parquet.schema.Type;
-import parquet.schema.GroupType;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.BlobDatum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-
-/**
- * Converter to convert a Parquet record into a Tajo Tuple.
- */
-public class TajoRecordConverter extends GroupConverter {
-  private final GroupType parquetSchema;
-  private final Schema tajoReadSchema;
-  private final int[] projectionMap;
-  private final int tupleSize;
-
-  private final Converter[] converters;
-
-  private Tuple currentTuple;
-
-  /**
-   * Creates a new TajoRecordConverter.
-   *
-   * @param parquetSchema The Parquet schema of the projection.
-   * @param tajoReadSchema The Tajo schema of the table.
-   * @param projectionMap An array mapping the projection column to the column
-   *                      index in the table.
-   */
-  public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema,
-                             int[] projectionMap) {
-    this.parquetSchema = parquetSchema;
-    this.tajoReadSchema = tajoReadSchema;
-    this.projectionMap = projectionMap;
-    this.tupleSize = tajoReadSchema.size();
-
-    // The projectionMap.length does not match parquetSchema.getFieldCount()
-    // when the projection contains NULL_TYPE columns. We will skip over the
-    // NULL_TYPE columns when we construct the converters and populate the
-    // NULL_TYPE columns with NullDatums in start().
-    int index = 0;
-    this.converters = new Converter[parquetSchema.getFieldCount()];
-    for (int i = 0; i < projectionMap.length; ++i) {
-      final int projectionIndex = projectionMap[i];
-      Column column = tajoReadSchema.getColumn(projectionIndex);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
-        continue;
-      }
-      Type type = parquetSchema.getType(index);
-      converters[index] = newConverter(column, type, new ParentValueContainer() {
-        @Override
-        void add(Object value) {
-          TajoRecordConverter.this.set(projectionIndex, value);
-        }
-      });
-      ++index;
-    }
-  }
-
-  private void set(int index, Object value) {
-    currentTuple.put(index, (Datum)value);
-  }
-
-  private Converter newConverter(Column column, Type type,
-                                 ParentValueContainer parent) {
-    DataType dataType = column.getDataType();
-    switch (dataType.getType()) {
-      case BOOLEAN:
-        return new FieldBooleanConverter(parent);
-      case BIT:
-        return new FieldBitConverter(parent);
-      case CHAR:
-        return new FieldCharConverter(parent);
-      case INT2:
-        return new FieldInt2Converter(parent);
-      case INT4:
-        return new FieldInt4Converter(parent);
-      case INT8:
-        return new FieldInt8Converter(parent);
-      case FLOAT4:
-        return new FieldFloat4Converter(parent);
-      case FLOAT8:
-        return new FieldFloat8Converter(parent);
-      case INET4:
-        return new FieldInet4Converter(parent);
-      case INET6:
-        throw new RuntimeException("No converter for INET6");
-      case TEXT:
-        return new FieldTextConverter(parent);
-      case PROTOBUF:
-        return new FieldProtobufConverter(parent, dataType);
-      case BLOB:
-        return new FieldBlobConverter(parent);
-      case NULL_TYPE:
-        throw new RuntimeException("No converter for NULL_TYPE.");
-      default:
-        throw new RuntimeException("Unsupported data type");
-    }
-  }
-
-  /**
-   * Gets the converter for a specific field.
-   *
-   * @param fieldIndex Index of the field in the projection.
-   * @return The converter for the field.
-   */
-  @Override
-  public Converter getConverter(int fieldIndex) {
-    return converters[fieldIndex];
-  }
-
-  /**
-   * Called before processing fields. This method fills any fields that have
-   * NULL values or have type NULL_TYPE with a NullDatum.
-   */
-  @Override
-  public void start() {
-    currentTuple = new VTuple(tupleSize);
-  }
-
-  /**
-   * Called after all fields have been processed.
-   */
-  @Override
-  public void end() {
-    for (int i = 0; i < projectionMap.length; ++i) {
-      final int projectionIndex = projectionMap[i];
-      Column column = tajoReadSchema.getColumn(projectionIndex);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
-          || currentTuple.get(projectionIndex) == null) {
-        set(projectionIndex, NullDatum.get());
-      }
-    }
-  }
-
-  /**
-   * Returns the current record converted by this converter.
-   *
-   * @return The current record.
-   */
-  public Tuple getCurrentRecord() {
-    return currentTuple;
-  }
-
-  static abstract class ParentValueContainer {
-    /**
-     * Adds the value to the parent.
-     *
-     * @param value The value to add.
-     */
-    abstract void add(Object value);
-  }
-
-  static final class FieldBooleanConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldBooleanConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBoolean(boolean value) {
-      parent.add(DatumFactory.createBool(value));
-    }
-  }
-
-  static final class FieldBitConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldBitConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createBit((byte)(value & 0xff)));
-    }
-  }
-
-  static final class FieldCharConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldCharConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createChar(value.toStringUsingUTF8()));
-    }
-  }
-
-  static final class FieldInt2Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInt2Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createInt2((short)value));
-    }
-  }
-
-  static final class FieldInt4Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInt4Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createInt4(value));
-    }
-  }
-
-  static final class FieldInt8Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInt8Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(DatumFactory.createInt8(value));
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createInt8(Long.valueOf(value)));
-    }
-  }
-
-  static final class FieldFloat4Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldFloat4Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
-    }
-
-    @Override
-    final public void addFloat(float value) {
-      parent.add(DatumFactory.createFloat4(value));
-    }
-  }
-
-  static final class FieldFloat8Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldFloat8Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
-    }
-
-    @Override
-    final public void addFloat(float value) {
-      parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
-    }
-
-    @Override
-    final public void addDouble(double value) {
-      parent.add(DatumFactory.createFloat8(value));
-    }
-  }
-
-  static final class FieldInet4Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInet4Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createInet4(value.getBytes()));
-    }
-  }
-
-  static final class FieldTextConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldTextConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createText(value.toStringUsingUTF8()));
-    }
-  }
-
-  static final class FieldBlobConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldBlobConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes())));
-    }
-  }
-
-  static final class FieldProtobufConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-    private final DataType dataType;
-
-    public FieldProtobufConverter(ParentValueContainer parent,
-                                  DataType dataType) {
-      this.parent = parent;
-      this.dataType = dataType;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      try {
-        ProtobufDatumFactory factory =
-            ProtobufDatumFactory.get(dataType.getCode());
-        Message.Builder builder = factory.newBuilder();
-        builder.mergeFrom(value.getBytes());
-        parent.add(factory.createDatum(builder));
-      } catch (InvalidProtocolBufferException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
deleted file mode 100644
index e31828c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.storage.Tuple;
-
-/**
- * Materializes a Tajo Tuple from a stream of Parquet data.
- */
-class TajoRecordMaterializer extends RecordMaterializer<Tuple> {
-  private final TajoRecordConverter root;
-
-  /**
-   * Creates a new TajoRecordMaterializer.
-   *
-   * @param parquetSchema The Parquet schema of the projection.
-   * @param tajoSchema The Tajo schema of the projection.
-   * @param tajoReadSchema The Tajo schema of the table.
-   */
-  public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema,
-                                Schema tajoReadSchema) {
-    int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
-    this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema,
-                                        projectionMap);
-  }
-
-  private int[] getProjectionMap(Schema schema, Schema projection) {
-    Column[] targets = projection.toArray();
-    int[] projectionMap = new int[targets.length];
-    for (int i = 0; i < targets.length; ++i) {
-      int tid = schema.getColumnId(targets[i].getQualifiedName());
-      projectionMap[i] = tid;
-    }
-    return projectionMap;
-  }
-
-  /**
-   * Returns the current record being materialized.
-   *
-   * @return The record being materialized.
-   */
-  @Override
-  public Tuple getCurrentRecord() {
-    return root.getCurrentRecord();
-  }
-
-  /**
-   * Returns the root converter.
-   *
-   * @return The root converter
-   */
-  @Override
-  public GroupConverter getRootConverter() {
-    return root;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
deleted file mode 100644
index 2592231..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Converts between Parquet and Tajo schemas. See package documentation for
- * details on the mapping.
- */
-public class TajoSchemaConverter {
-  private static final String TABLE_SCHEMA = "table_schema";
-
-  /**
-   * Creates a new TajoSchemaConverter.
-   */
-  public TajoSchemaConverter() {
-  }
-
-  /**
-   * Converts a Parquet schema to a Tajo schema.
-   *
-   * @param parquetSchema The Parquet schema to convert.
-   * @return The resulting Tajo schema.
-   */
-  public Schema convert(MessageType parquetSchema) {
-    return convertFields(parquetSchema.getFields());
-  }
-
-  private Schema convertFields(List<Type> parquetFields) {
-    List<Column> columns = new ArrayList<Column>();
-    for (int i = 0; i < parquetFields.size(); ++i) {
-      Type fieldType = parquetFields.get(i);
-      if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
-        throw new RuntimeException("REPEATED not supported outside LIST or" +
-            " MAP. Type: " + fieldType);
-      }
-      columns.add(convertField(fieldType));
-    }
-    Column[] columnsArray = new Column[columns.size()];
-    columnsArray = columns.toArray(columnsArray);
-    return new Schema(columnsArray);
-  }
-
-  private Column convertField(final Type fieldType) {
-    if (fieldType.isPrimitive()) {
-      return convertPrimitiveField(fieldType);
-    } else {
-      return convertComplexField(fieldType);
-    }
-  }
-
-  private Column convertPrimitiveField(final Type fieldType) {
-    final String fieldName = fieldType.getName();
-    final PrimitiveTypeName parquetPrimitiveTypeName =
-        fieldType.asPrimitiveType().getPrimitiveTypeName();
-    final OriginalType originalType = fieldType.getOriginalType();
-    return parquetPrimitiveTypeName.convert(
-        new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() {
-      @Override
-      public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.BOOLEAN);
-      }
-
-      @Override
-      public Column convertINT32(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.INT4);
-      }
-
-      @Override
-      public Column convertINT64(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.INT8);
-      }
-
-      @Override
-      public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.FLOAT4);
-      }
-
-      @Override
-      public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.FLOAT8);
-      }
-
-      @Override
-      public Column convertFIXED_LEN_BYTE_ARRAY(
-          PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.BLOB);
-      }
-
-      @Override
-      public Column convertBINARY(PrimitiveTypeName primitiveTypeName) {
-        if (originalType == OriginalType.UTF8) {
-          return new Column(fieldName, TajoDataTypes.Type.TEXT);
-        } else {
-          return new Column(fieldName, TajoDataTypes.Type.BLOB);
-        }
-      }
-
-      @Override
-      public Column convertINT96(PrimitiveTypeName primitiveTypeName) {
-        throw new RuntimeException("Converting from INT96 not supported.");
-      }
-    });
-  }
-
-  private Column convertComplexField(final Type fieldType) {
-    throw new RuntimeException("Complex types not supported.");
-  }
-
-  /**
-   * Converts a Tajo schema to a Parquet schema.
-   *
-   * @param tajoSchema The Tajo schema to convert.
-   * @return The resulting Parquet schema.
-   */
-  public MessageType convert(Schema tajoSchema) {
-    List<Type> types = new ArrayList<Type>();
-    for (int i = 0; i < tajoSchema.size(); ++i) {
-      Column column = tajoSchema.getColumn(i);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
-        continue;
-      }
-      types.add(convertColumn(column));
-    }
-    return new MessageType(TABLE_SCHEMA, types);
-  }
-
-  private Type convertColumn(Column column) {
-    TajoDataTypes.Type type = column.getDataType().getType();
-    switch (type) {
-      case BOOLEAN:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BOOLEAN);
-      case BIT:
-      case INT2:
-      case INT4:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.INT32);
-      case INT8:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.INT64);
-      case FLOAT4:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.FLOAT);
-      case FLOAT8:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.DOUBLE);
-      case CHAR:
-      case TEXT:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY,
-                         OriginalType.UTF8);
-      case PROTOBUF:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY);
-      case BLOB:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY);
-      case INET4:
-      case INET6:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY);
-      default:
-        throw new RuntimeException("Cannot convert Tajo type: " + type);
-    }
-  }
-
-  private PrimitiveType primitive(String name,
-                                  PrimitiveType.PrimitiveTypeName primitive) {
-    return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null);
-  }
-
-  private PrimitiveType primitive(String name,
-                                  PrimitiveType.PrimitiveTypeName primitive,
-                                  OriginalType originalType) {
-    return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name,
-                             originalType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
deleted file mode 100644
index 35165de..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.datum.Datum;
-
-/**
- * Tajo implementation of {@link WriteSupport} for {@link Tuple}s.
- * Users should use {@link ParquetAppender} and not this class directly.
- */
-public class TajoWriteSupport extends WriteSupport<Tuple> {
-  private RecordConsumer recordConsumer;
-  private MessageType rootSchema;
-  private Schema rootTajoSchema;
-
-  /**
-   * Creates a new TajoWriteSupport.
-   *
-   * @param tajoSchema The Tajo schema for the table.
-   */
-  public TajoWriteSupport(Schema tajoSchema) {
-    this.rootSchema = new TajoSchemaConverter().convert(tajoSchema);
-    this.rootTajoSchema = tajoSchema;
-  }
-
-  /**
-   * Initializes the WriteSupport.
-   *
-   * @param configuration The job's configuration.
-   * @return A WriteContext that describes how to write the file.
-   */
-  @Override
-  public WriteContext init(Configuration configuration) {
-    Map<String, String> extraMetaData = new HashMap<String, String>();
-    return new WriteContext(rootSchema, extraMetaData);
-  }
-
-  /**
-   * Called once per row group.
-   *
-   * @param recordConsumer The {@link RecordConsumer} to write to.
-   */
-  @Override
-  public void prepareForWrite(RecordConsumer recordConsumer) {
-    this.recordConsumer = recordConsumer;
-  }
-
-  /**
-   * Writes a Tuple to the file.
-   *
-   * @param tuple The Tuple to write to the file.
-   */
-  @Override
-  public void write(Tuple tuple) {
-    recordConsumer.startMessage();
-    writeRecordFields(rootSchema, rootTajoSchema, tuple);
-    recordConsumer.endMessage();
-  }
-
-  private void writeRecordFields(GroupType schema, Schema tajoSchema,
-                                 Tuple tuple) {
-    List<Type> fields = schema.getFields();
-    // Parquet ignores Tajo NULL_TYPE columns, so the index may differ.
-    int index = 0;
-    for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) {
-      Column column = tajoSchema.getColumn(tajoIndex);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
-        continue;
-      }
-      Datum datum = tuple.get(tajoIndex);
-      Type fieldType = fields.get(index);
-      if (!tuple.isNull(tajoIndex)) {
-        recordConsumer.startField(fieldType.getName(), index);
-        writeValue(fieldType, column, datum);
-        recordConsumer.endField(fieldType.getName(), index);
-      } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
-        throw new RuntimeException("Null-value for required field: " +
-            column.getSimpleName());
-      }
-      ++index;
-    }
-  }
-
-  private void writeValue(Type fieldType, Column column, Datum datum) {
-    switch (column.getDataType().getType()) {
-      case BOOLEAN:
-        recordConsumer.addBoolean((Boolean) datum.asBool());
-        break;
-      case BIT:
-      case INT2:
-      case INT4:
-        recordConsumer.addInteger(datum.asInt4());
-        break;
-      case INT8:
-        recordConsumer.addLong(datum.asInt8());
-        break;
-      case FLOAT4:
-        recordConsumer.addFloat(datum.asFloat4());
-        break;
-      case FLOAT8:
-        recordConsumer.addDouble(datum.asFloat8());
-        break;
-      case CHAR:
-      case TEXT:
-        recordConsumer.addBinary(Binary.fromString(datum.asChars()));
-        break;
-      case PROTOBUF:
-      case BLOB:
-      case INET4:
-      case INET6:
-        recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
-        break;
-      default:
-        break;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
deleted file mode 100644
index d7d16b7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * <p>
- * Provides read and write support for Parquet files. Tajo schemas are
- * converted to Parquet schemas according to the following mapping of Tajo
- * and Parquet types:
- * </p>
- *
- * <table>
- *   <tr>
- *     <th>Tajo type</th>
- *     <th>Parquet type</th>
- *   </tr>
- *   <tr>
- *     <td>NULL_TYPE</td>
- *     <td>No type. The field is not encoded in Parquet.</td>
- *   </tr>
- *   <tr>
- *     <td>BOOLEAN</td>
- *     <td>BOOLEAN</td>
- *   </tr>
- *   <tr>
- *     <td>BIT</td>
- *     <td>INT32</td>
- *   </tr>
- *   <tr>
- *     <td>INT2</td>
- *     <td>INT32</td>
- *   </tr>
- *   <tr>
- *     <td>INT4</td>
- *     <td>INT32</td>
- *   </tr>
- *   <tr>
- *     <td>INT8</td>
- *     <td>INT64</td>
- *   </tr>
- *   <tr>
- *     <td>FLOAT4</td>
- *     <td>FLOAT</td>
- *   </tr>
- *   <tr>
- *     <td>FLOAT8</td>
- *     <td>DOUBLE</td>
- *   </tr>
- *   <tr>
- *     <td>CHAR</td>
- *     <td>BINARY (with OriginalType UTF8)</td>
- *   </tr>
- *   <tr>
- *     <td>TEXT</td>
- *     <td>BINARY (with OriginalType UTF8)</td>
- *   </tr>
- *   <tr>
- *     <td>PROTOBUF</td>
- *     <td>BINARY</td>
- *   </tr>
- *   <tr>
- *     <td>BLOB</td>
- *     <td>BINARY</td>
- *   </tr>
- *   <tr>
- *     <td>INET4</td>
- *     <td>BINARY</td>
- *   </tr>
- * </table>
- *
- * <p>
- * Because Tajo fields can be NULL, all Parquet fields are marked as optional.
- * </p>
- *
- * <p>
- * The conversion from Tajo to Parquet is lossy without the original Tajo
- * schema. As a result, Parquet files are read using the Tajo schema saved in
- * the Tajo catalog for the table the Parquet files belong to, which was
- * defined when the table was created.
- * </p>
- */
-
-package org.apache.tajo.storage.parquet;

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
deleted file mode 100644
index 5e200a0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import com.google.common.base.Objects;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable,
- * and is able to resize without recreating new array if not necessary.
- * <p>
- *
- * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field,
- * which is the desired valid number of <tt>BytesRefWritable</tt> it holds.
- * <tt>resetValid</tt> can reset the valid, but it will not care the underlying
- * BytesRefWritable.
- */
-
-public class BytesRefArrayWritable implements Writable,
-    Comparable<BytesRefArrayWritable> {
-
-  private BytesRefWritable[] bytesRefWritables = null;
-
-  private int valid = 0;
-
-  /**
-   * Constructs an empty array with the specified capacity.
-   *
-   * @param capacity
-   *          initial capacity
-   * @exception IllegalArgumentException
-   *              if the specified initial capacity is negative
-   */
-  public BytesRefArrayWritable(int capacity) {
-    if (capacity < 0) {
-      throw new IllegalArgumentException("Capacity can not be negative.");
-    }
-    bytesRefWritables = new BytesRefWritable[0];
-    ensureCapacity(capacity);
-  }
-
-  /**
-   * Constructs an empty array with a capacity of ten.
-   */
-  public BytesRefArrayWritable() {
-    this(10);
-  }
-
-  /**
-   * Returns the number of valid elements.
-   *
-   * @return the number of valid elements
-   */
-  public int size() {
-    return valid;
-  }
-
-  /**
-   * Gets the BytesRefWritable at the specified position. Make sure the position
-   * is valid by first call resetValid.
-   *
-   * @param index
-   *          the position index, starting from zero
-   * @throws IndexOutOfBoundsException
-   */
-  public BytesRefWritable get(int index) {
-    if (index >= valid) {
-      throw new IndexOutOfBoundsException(
-          "This BytesRefArrayWritable only has " + valid + " valid values.");
-    }
-    return bytesRefWritables[index];
-  }
-
-  /**
-   * Gets the BytesRefWritable at the specified position without checking.
-   *
-   * @param index
-   *          the position index, starting from zero
-   * @throws IndexOutOfBoundsException
-   */
-  public BytesRefWritable unCheckedGet(int index) {
-    return bytesRefWritables[index];
-  }
-
-  /**
-   * Set the BytesRefWritable at the specified position with the specified
-   * BytesRefWritable.
-   *
-   * @param index
-   *          index position
-   * @param bytesRefWritable
-   *          the new element
-   * @throws IllegalArgumentException
-   *           if the specified new element is null
-   */
-  public void set(int index, BytesRefWritable bytesRefWritable) {
-    if (bytesRefWritable == null) {
-      throw new IllegalArgumentException("Can not assign null.");
-    }
-    ensureCapacity(index + 1);
-    bytesRefWritables[index] = bytesRefWritable;
-    if (valid <= index) {
-      valid = index + 1;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int compareTo(BytesRefArrayWritable other) {
-    if (other == null) {
-      throw new IllegalArgumentException("Argument can not be null.");
-    }
-    if (this == other) {
-      return 0;
-    }
-    int sizeDiff = valid - other.valid;
-    if (sizeDiff != 0) {
-      return sizeDiff;
-    }
-    for (int i = 0; i < valid; i++) {
-      if (other.contains(bytesRefWritables[i])) {
-        continue;
-      } else {
-        return 1;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public int hashCode(){
-    return Objects.hashCode(bytesRefWritables);
-  }
-  /**
-   * Returns <tt>true</tt> if this instance contains one or more the specified
-   * BytesRefWritable.
-   *
-   * @param bytesRefWritable
-   *          BytesRefWritable element to be tested
-   * @return <tt>true</tt> if contains the specified element
-   * @throws IllegalArgumentException
-   *           if the specified element is null
-   */
-  public boolean contains(BytesRefWritable bytesRefWritable) {
-    if (bytesRefWritable == null) {
-      throw new IllegalArgumentException("Argument can not be null.");
-    }
-    for (int i = 0; i < valid; i++) {
-      if (bytesRefWritables[i].equals(bytesRefWritable)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (o == null || !(o instanceof BytesRefArrayWritable)) {
-      return false;
-    }
-    return compareTo((BytesRefArrayWritable) o) == 0;
-  }
-
-  /**
-   * Removes all elements.
-   */
-  public void clear() {
-    valid = 0;
-  }
-
-  /**
-   * enlarge the capacity if necessary, to ensure that it can hold the number of
-   * elements specified by newValidCapacity argument. It will also narrow the
-   * valid capacity when needed. Notice: it only enlarge or narrow the valid
-   * capacity with no care of the already stored invalid BytesRefWritable.
-   *
-   * @param newValidCapacity
-   *          the desired capacity
-   */
-  public void resetValid(int newValidCapacity) {
-    ensureCapacity(newValidCapacity);
-    valid = newValidCapacity;
-  }
-
-  protected void ensureCapacity(int newCapacity) {
-    int size = bytesRefWritables.length;
-    if (size < newCapacity) {
-      bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity);
-      while (size < newCapacity) {
-        bytesRefWritables[size] = new BytesRefWritable();
-        size++;
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int count = in.readInt();
-    ensureCapacity(count);
-    for (int i = 0; i < count; i++) {
-      bytesRefWritables[i].readFields(in);
-    }
-    valid = count;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(valid);
-
-    for (int i = 0; i < valid; i++) {
-      BytesRefWritable cu = bytesRefWritables[i];
-      cu.write(out);
-    }
-  }
-
-  static {
-    WritableFactories.setFactory(BytesRefArrayWritable.class,
-        new WritableFactory() {
-
-          @Override
-          public Writable newInstance() {
-            return new BytesRefArrayWritable();
-          }
-
-        });
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
deleted file mode 100644
index c83b505..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used
- * to avoid unnecessary byte copy.
- */
-public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> {
-
-  private static final byte[] EMPTY_BYTES = new byte[0];
-  public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable();
-
-  int start = 0;
-  int length = 0;
-  byte[] bytes = null;
-
-  LazyDecompressionCallback lazyDecompressObj;
-
-  /**
-   * Create a zero-size bytes.
-   */
-  public BytesRefWritable() {
-    this(EMPTY_BYTES);
-  }
-
-  /**
-   * Create a BytesRefWritable with <tt>length</tt> bytes.
-   */
-  public BytesRefWritable(int length) {
-    assert length > 0;
-    this.length = length;
-    bytes = new byte[this.length];
-    start = 0;
-  }
-
-  /**
-   * Create a BytesRefWritable referenced to the given bytes.
-   */
-  public BytesRefWritable(byte[] bytes) {
-    this.bytes = bytes;
-    length = bytes.length;
-    start = 0;
-  }
-
-  /**
-   * Create a BytesRefWritable referenced to one section of the given bytes. The
-   * section is determined by argument <tt>offset</tt> and <tt>len</tt>.
-   */
-  public BytesRefWritable(byte[] data, int offset, int len) {
-    bytes = data;
-    start = offset;
-    length = len;
-  }
-
-  /**
-   * Create a BytesRefWritable referenced to one section of the given bytes. The
-   * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback
-   * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to
-   * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and
-   * <tt>len</tt> after uncompressing the data.
-   */
-  public BytesRefWritable(LazyDecompressionCallback lazyDecompressData,
-                          int offset, int len) {
-    lazyDecompressObj = lazyDecompressData;
-    start = offset;
-    length = len;
-  }
-
-  private void lazyDecompress() throws IOException {
-    if (bytes == null && lazyDecompressObj != null) {
-      bytes = lazyDecompressObj.decompress();
-    }
-  }
-
-  /**
-   * Returns a copy of the underlying bytes referenced by this instance.
-   *
-   * @return a new copied byte array
-   * @throws IOException
-   */
-  public byte[] getBytesCopy() throws IOException {
-    lazyDecompress();
-    byte[] bb = new byte[length];
-    System.arraycopy(bytes, start, bb, 0, length);
-    return bb;
-  }
-
-  /**
-   * Returns the underlying bytes.
-   *
-   * @throws IOException
-   */
-  public byte[] getData() throws IOException {
-    lazyDecompress();
-    return bytes;
-  }
-
-  /**
-   * readFields() will corrupt the array. So use the set method whenever
-   * possible.
-   *
-   * @see #readFields(DataInput)
-   */
-  public void set(byte[] newData, int offset, int len) {
-    bytes = newData;
-    start = offset;
-    length = len;
-    lazyDecompressObj = null;
-  }
-
-  /**
-   * readFields() will corrupt the array. So use the set method whenever
-   * possible.
-   *
-   * @see #readFields(DataInput)
-   */
-  public void set(LazyDecompressionCallback newData, int offset, int len) {
-    bytes = null;
-    start = offset;
-    length = len;
-    lazyDecompressObj = newData;
-  }
-
-  public void writeDataTo(DataOutput out) throws IOException {
-    lazyDecompress();
-    out.write(bytes, start, length);
-  }
-
-  /**
-   * Always reuse the bytes array if length of bytes array is equal or greater
-   * to the current record, otherwise create a new one. readFields will corrupt
-   * the array. Please use set() whenever possible.
-   *
-   * @see #set(byte[], int, int)
-   */
-  public void readFields(DataInput in) throws IOException {
-    int len = in.readInt();
-    if (len > bytes.length) {
-      bytes = new byte[len];
-    }
-    start = 0;
-    length = len;
-    in.readFully(bytes, start, length);
-  }
-
-  /** {@inheritDoc} */
-  public void write(DataOutput out) throws IOException {
-    lazyDecompress();
-    out.writeInt(length);
-    out.write(bytes, start, length);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public int hashCode() {
-    return super.hashCode();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder(3 * length);
-    for (int idx = start; idx < length; idx++) {
-      // if not the first, put a blank separator in
-      if (idx != 0) {
-        sb.append(' ');
-      }
-      String num = Integer.toHexString(0xff & bytes[idx]);
-      // if it is only one digit, add a leading 0.
-      if (num.length() < 2) {
-        sb.append('0');
-      }
-      sb.append(num);
-    }
-    return sb.toString();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public int compareTo(BytesRefWritable other) {
-    if (other == null) {
-      throw new IllegalArgumentException("Argument can not be null.");
-    }
-    if (this == other) {
-      return 0;
-    }
-    try {
-      return WritableComparator.compareBytes(getData(), start, getLength(),
-          other.getData(), other.start, other.getLength());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public boolean equals(Object right_obj) {
-    if (right_obj == null || !(right_obj instanceof BytesRefWritable)) {
-      return false;
-    }
-    return compareTo((BytesRefWritable) right_obj) == 0;
-  }
-
-  static {
-    WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() {
-
-      @Override
-      public Writable newInstance() {
-        return new BytesRefWritable();
-      }
-
-    });
-  }
-
-  public int getLength() {
-    return length;
-  }
-
-  public int getStart() {
-    return start;
-  }
-}