You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:40 UTC

[26/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
deleted file mode 100644
index 73ce7c2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.Version;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DictionaryPage;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.FileMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-/**
- * Internal implementation of the Parquet file writer as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileWriter {
-  private static final Log LOG = Log.getLog(ParquetFileWriter.class);
-
-  public static final String PARQUET_METADATA_FILE = "_metadata";
-  public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
-  public static final int CURRENT_VERSION = 1;
-
-  private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-
-  private final MessageType schema;
-  private final FSDataOutputStream out;
-  private BlockMetaData currentBlock;
-  private ColumnChunkMetaData currentColumn;
-  private long currentRecordCount;
-  private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-  private long uncompressedLength;
-  private long compressedLength;
-  private Set<parquet.column.Encoding> currentEncodings;
-
-  private CompressionCodecName currentChunkCodec;
-  private ColumnPath currentChunkPath;
-  private PrimitiveTypeName currentChunkType;
-  private long currentChunkFirstDataPage;
-  private long currentChunkDictionaryPageOffset;
-  private long currentChunkValueCount;
-
-  private Statistics currentStatistics;
-
-  /**
-   * Captures the order in which methods should be called
-   *
-   * @author Julien Le Dem
-   *
-   */
-  private enum STATE {
-    NOT_STARTED {
-      STATE start() {
-        return STARTED;
-      }
-    },
-    STARTED {
-      STATE startBlock() {
-        return BLOCK;
-      }
-      STATE end() {
-        return ENDED;
-      }
-    },
-    BLOCK  {
-      STATE startColumn() {
-        return COLUMN;
-      }
-      STATE endBlock() {
-        return STARTED;
-      }
-    },
-    COLUMN {
-      STATE endColumn() {
-        return BLOCK;
-      };
-      STATE write() {
-        return this;
-      }
-    },
-    ENDED;
-
-    STATE start() throws IOException { return error(); }
-    STATE startBlock() throws IOException { return error(); }
-    STATE startColumn() throws IOException { return error(); }
-    STATE write() throws IOException { return error(); }
-    STATE endColumn() throws IOException { return error(); }
-    STATE endBlock() throws IOException { return error(); }
-    STATE end() throws IOException { return error(); }
-
-    private final STATE error() throws IOException {
-      throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
-    }
-  }
-
-  private STATE state = STATE.NOT_STARTED;
-
-  /**
-   *
-   * @param schema the schema of the data
-   * @param out the file to write to
-   * @param codec the codec to use to compress blocks
-   * @throws IOException if the file can not be created
-   */
-  public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
-    super();
-    this.schema = schema;
-    FileSystem fs = file.getFileSystem(configuration);
-    this.out = fs.create(file, false);
-  }
-
-  /**
-   * start the file
-   * @throws IOException
-   */
-  public void start() throws IOException {
-    state = state.start();
-    if (DEBUG) LOG.debug(out.getPos() + ": start");
-    out.write(MAGIC);
-  }
-
-  /**
-   * start a block
-   * @param recordCount the record count in this block
-   * @throws IOException
-   */
-  public void startBlock(long recordCount) throws IOException {
-    state = state.startBlock();
-    if (DEBUG) LOG.debug(out.getPos() + ": start block");
-//    out.write(MAGIC); // TODO: add a magic delimiter
-    currentBlock = new BlockMetaData();
-    currentRecordCount = recordCount;
-  }
-
-  /**
-   * start a column inside a block
-   * @param descriptor the column descriptor
-   * @param valueCount the value count in this column
-   * @param statistics the statistics in this column
-   * @param compressionCodecName
-   * @throws IOException
-   */
-  public void startColumn(ColumnDescriptor descriptor,
-                          long valueCount,
-                          CompressionCodecName compressionCodecName) throws IOException {
-    state = state.startColumn();
-    if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
-    currentEncodings = new HashSet<parquet.column.Encoding>();
-    currentChunkPath = ColumnPath.get(descriptor.getPath());
-    currentChunkType = descriptor.getType();
-    currentChunkCodec = compressionCodecName;
-    currentChunkValueCount = valueCount;
-    currentChunkFirstDataPage = out.getPos();
-    compressedLength = 0;
-    uncompressedLength = 0;
-    // need to know what type of stats to initialize to
-    // better way to do this?
-    currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
-  }
-
-  /**
-   * writes a dictionary page page
-   * @param dictionaryPage the dictionary page
-   */
-  public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
-    state = state.write();
-    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
-    currentChunkDictionaryPageOffset = out.getPos();
-    int uncompressedSize = dictionaryPage.getUncompressedSize();
-    int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
-    metadataConverter.writeDictionaryPageHeader(
-        uncompressedSize,
-        compressedPageSize,
-        dictionaryPage.getDictionarySize(),
-        dictionaryPage.getEncoding(),
-        out);
-    long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
-    this.uncompressedLength += uncompressedSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
-    dictionaryPage.getBytes().writeAllTo(out);
-    currentEncodings.add(dictionaryPage.getEncoding());
-  }
-
-
-  /**
-   * writes a single page
-   * @param valueCount count of values
-   * @param uncompressedPageSize the size of the data once uncompressed
-   * @param bytes the compressed data for the page without header
-   * @param rlEncoding encoding of the repetition level
-   * @param dlEncoding encoding of the definition level
-   * @param valuesEncoding encoding of values
-   */
-  @Deprecated
-  public void writeDataPage(
-      int valueCount, int uncompressedPageSize,
-      BytesInput bytes,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding) throws IOException {
-    state = state.write();
-    long beforeHeader = out.getPos();
-    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
-    int compressedPageSize = (int)bytes.size();
-    metadataConverter.writeDataPageHeader(
-        uncompressedPageSize, compressedPageSize,
-        valueCount,
-        rlEncoding,
-        dlEncoding,
-        valuesEncoding,
-        out);
-    long headerSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedPageSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
-    bytes.writeAllTo(out);
-    currentEncodings.add(rlEncoding);
-    currentEncodings.add(dlEncoding);
-    currentEncodings.add(valuesEncoding);
-  }
-
-  /**
-   * writes a single page
-   * @param valueCount count of values
-   * @param uncompressedPageSize the size of the data once uncompressed
-   * @param bytes the compressed data for the page without header
-   * @param rlEncoding encoding of the repetition level
-   * @param dlEncoding encoding of the definition level
-   * @param valuesEncoding encoding of values
-   */
-  public void writeDataPage(
-      int valueCount, int uncompressedPageSize,
-      BytesInput bytes,
-      Statistics statistics,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding) throws IOException {
-    state = state.write();
-    long beforeHeader = out.getPos();
-    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
-    int compressedPageSize = (int)bytes.size();
-    metadataConverter.writeDataPageHeader(
-        uncompressedPageSize, compressedPageSize,
-        valueCount,
-        statistics,
-        rlEncoding,
-        dlEncoding,
-        valuesEncoding,
-        out);
-    long headerSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedPageSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
-    bytes.writeAllTo(out);
-    currentStatistics.mergeStatistics(statistics);
-    currentEncodings.add(rlEncoding);
-    currentEncodings.add(dlEncoding);
-    currentEncodings.add(valuesEncoding);
-  }
-
-  /**
-   * writes a number of pages at once
-   * @param bytes bytes to be written including page headers
-   * @param uncompressedTotalPageSize total uncompressed size (without page headers)
-   * @param compressedTotalPageSize total compressed size (without page headers)
-   * @throws IOException
-   */
-  void writeDataPages(BytesInput bytes,
-                      long uncompressedTotalPageSize,
-                      long compressedTotalPageSize,
-                      Statistics totalStats,
-                      List<parquet.column.Encoding> encodings) throws IOException {
-    state = state.write();
-    if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
-    long headersSize = bytes.size() - compressedTotalPageSize;
-    this.uncompressedLength += uncompressedTotalPageSize + headersSize;
-    this.compressedLength += compressedTotalPageSize + headersSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
-    bytes.writeAllTo(out);
-    currentEncodings.addAll(encodings);
-    currentStatistics = totalStats;
-  }
-
-  /**
-   * end a column (once all rep, def and data have been written)
-   * @throws IOException
-   */
-  public void endColumn() throws IOException {
-    state = state.endColumn();
-    if (DEBUG) LOG.debug(out.getPos() + ": end column");
-    currentBlock.addColumn(ColumnChunkMetaData.get(
-        currentChunkPath,
-        currentChunkType,
-        currentChunkCodec,
-        currentEncodings,
-        currentStatistics,
-        currentChunkFirstDataPage,
-        currentChunkDictionaryPageOffset,
-        currentChunkValueCount,
-        compressedLength,
-        uncompressedLength));
-    if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
-    currentColumn = null;
-    this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
-    this.uncompressedLength = 0;
-    this.compressedLength = 0;
-  }
-
-  /**
-   * ends a block once all column chunks have been written
-   * @throws IOException
-   */
-  public void endBlock() throws IOException {
-    state = state.endBlock();
-    if (DEBUG) LOG.debug(out.getPos() + ": end block");
-    currentBlock.setRowCount(currentRecordCount);
-    blocks.add(currentBlock);
-    currentBlock = null;
-  }
-
-  /**
-   * ends a file once all blocks have been written.
-   * closes the file.
-   * @param extraMetaData the extra meta data to write in the footer
-   * @throws IOException
-   */
-  public void end(Map<String, String> extraMetaData) throws IOException {
-    state = state.end();
-    if (DEBUG) LOG.debug(out.getPos() + ": end");
-    ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
-    serializeFooter(footer, out);
-    out.close();
-  }
-
-  private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
-    long footerIndex = out.getPos();
-    parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
-    writeFileMetaData(parquetMetadata, out);
-    if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
-    BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
-    out.write(MAGIC);
-  }
-
-  /**
-   * writes a _metadata file
-   * @param configuration the configuration to use to get the FileSystem
-   * @param outputPath the directory to write the _metadata file to
-   * @param footers the list of footers to merge
-   * @throws IOException
-   */
-  public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
-    Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
-    FileSystem fs = outputPath.getFileSystem(configuration);
-    outputPath = outputPath.makeQualified(fs);
-    FSDataOutputStream metadata = fs.create(metaDataPath);
-    metadata.write(MAGIC);
-    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
-    serializeFooter(metadataFooter, metadata);
-    metadata.close();
-  }
-
-  private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
-    String rootPath = root.toString();
-    GlobalMetaData fileMetaData = null;
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-    for (Footer footer : footers) {
-      String path = footer.getFile().toString();
-      if (!path.startsWith(rootPath)) {
-        throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root);
-      }
-      path = path.substring(rootPath.length());
-      while (path.startsWith("/")) {
-        path = path.substring(1);
-      }
-      fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
-      for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
-        block.setPath(path);
-        blocks.add(block);
-      }
-    }
-    return new ParquetMetadata(fileMetaData.merge(), blocks);
-  }
-
-  /**
-   * @return the current position in the underlying file
-   * @throws IOException
-   */
-  public long getPos() throws IOException {
-    return out.getPos();
-  }
-
-  /**
-   * Will merge the metadata of all the footers together
-   * @param footers the list files footers to merge
-   * @return the global meta data for all the footers
-   */
-  static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
-    GlobalMetaData fileMetaData = null;
-    for (Footer footer : footers) {
-      ParquetMetadata currentMetadata = footer.getParquetMetadata();
-      fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
-    }
-    return fileMetaData;
-  }
-
-  /**
-   * Will return the result of merging toMerge into mergedMetadata
-   * @param toMerge the metadata toMerge
-   * @param mergedMetadata the reference metadata to merge into
-   * @return the result of the merge
-   */
-  static GlobalMetaData mergeInto(
-      FileMetaData toMerge,
-      GlobalMetaData mergedMetadata) {
-    MessageType schema = null;
-    Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
-    Set<String> createdBy = new HashSet<String>();
-    if (mergedMetadata != null) {
-      schema = mergedMetadata.getSchema();
-      newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
-      createdBy.addAll(mergedMetadata.getCreatedBy());
-    }
-    if ((schema == null && toMerge.getSchema() != null)
-        || (schema != null && !schema.equals(toMerge.getSchema()))) {
-      schema = mergeInto(toMerge.getSchema(), schema);
-    }
-    for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
-      Set<String> values = newKeyValues.get(entry.getKey());
-      if (values == null) {
-        values = new HashSet<String>();
-        newKeyValues.put(entry.getKey(), values);
-      }
-      values.add(entry.getValue());
-    }
-    createdBy.add(toMerge.getCreatedBy());
-    return new GlobalMetaData(
-        schema,
-        newKeyValues,
-        createdBy);
-  }
-
-  /**
-   * will return the result of merging toMerge into mergedSchema
-   * @param toMerge the schema to merge into mergedSchema
-   * @param mergedSchema the schema to append the fields to
-   * @return the resulting schema
-   */
-  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
-    if (mergedSchema == null) {
-      return toMerge;
-    }
-    return mergedSchema.union(toMerge);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
deleted file mode 100644
index 0fb2c3a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.*;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.schema.MessageType;
-
-/**
- * Read records from a Parquet file.
- */
-public class ParquetReader<T> implements Closeable {
-
-  private ReadSupport<T> readSupport;
-  private UnboundRecordFilter filter;
-  private Configuration conf;
-  private ReadContext readContext;
-  private Iterator<Footer> footersIterator;
-  private InternalParquetRecordReader<T> reader;
-  private GlobalMetaData globalMetaData;
-
-  /**
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @throws IOException
-   */
-  public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
-    this(file, readSupport, null);
-  }
-
-  /**
-   * @param conf the configuration
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @throws IOException
-   */
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
-    this(conf, file, readSupport, null);
-  }
-
-  /**
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
-   * @throws IOException
-   */
-  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
-    this(new Configuration(), file, readSupport, filter);
-  }
-
-  /**
-   * @param conf the configuration
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
-   * @throws IOException
-   */
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
-    this.readSupport = readSupport;
-    this.filter = filter;
-    this.conf = conf;
-
-    FileSystem fs = file.getFileSystem(conf);
-    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
-    this.footersIterator = footers.iterator();
-    globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
-
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-    for (Footer footer : footers) {
-      blocks.addAll(footer.getParquetMetadata().getBlocks());
-    }
-
-    MessageType schema = globalMetaData.getSchema();
-    Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
-    readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
-  }
-
-  /**
-   * @return the next record or null if finished
-   * @throws IOException
-   */
-  public T read() throws IOException {
-    try {
-      if (reader != null && reader.nextKeyValue()) {
-        return reader.getCurrentValue();
-      } else {
-        initReader();
-        return reader == null ? null : read();
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private void initReader() throws IOException {
-    if (reader != null) {
-      reader.close();
-      reader = null;
-    }
-    if (footersIterator.hasNext()) {
-      Footer footer = footersIterator.next();
-      reader = new InternalParquetRecordReader<T>(readSupport, filter);
-      reader.initialize(
-          readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
-          readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
deleted file mode 100644
index 0447a47..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.column.ParquetProperties;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class ParquetWriter<T> implements Closeable {
-
-  public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
-  public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
-  public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
-      CompressionCodecName.UNCOMPRESSED;
-  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
-  public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
-  public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
-      ParquetProperties.WriterVersion.PARQUET_1_0;
-
-  private final InternalParquetRecordWriter<T> writer;
-
-  /**
-   * Create a new ParquetWriter.
-   * (with dictionary encoding enabled and validation off)
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @throws java.io.IOException
-   * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean)
-   */
-  public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
-        DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold (both data and dictionary)
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @throws IOException
-   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      boolean enableDictionary,
-      boolean validating) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @throws IOException
-   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
-        dictionaryPageSize, enableDictionary, validating,
-        DEFAULT_WRITER_VERSION);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
-   * configuration from the classpath.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
-   * @throws IOException
-   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      ParquetProperties.WriterVersion writerVersion) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
-   * @param conf Hadoop configuration to use while accessing the filesystem
-   * @throws IOException
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      ParquetProperties.WriterVersion writerVersion,
-      Configuration conf) throws IOException {
-
-    WriteSupport.WriteContext writeContext = writeSupport.init(conf);
-    MessageType schema = writeContext.getSchema();
-
-    ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
-    fileWriter.start();
-
-    CodecFactory codecFactory = new CodecFactory(conf);
-    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName, 0);
-    this.writer = new InternalParquetRecordWriter<T>(
-        fileWriter,
-        writeSupport,
-        schema,
-        writeContext.getExtraMetaData(),
-        blockSize,
-        pageSize,
-        compressor,
-        dictionaryPageSize,
-        enableDictionary,
-        validating,
-        writerVersion);
-  }
-
-  /**
-   * Create a new ParquetWriter.  The default block size is 50 MB.The default
-   * page size is 1 MB.  Default compression is no compression. Dictionary encoding is disabled.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @throws IOException
-   */
-  public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
-    this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
-  }
-
-  public void write(T object) throws IOException {
-    try {
-      writer.write(object);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public long getEstimatedWrittenSize() throws IOException {
-    return this.writer.getEstimatedWrittenSize();
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      writer.close();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
deleted file mode 100644
index c1835df..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.HeapTuple;
-import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
-import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
-  private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
-
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  // buffer
-  private ByteBuffer buffer;
-  private long address;
-
-  public BaseTupleBuilder(Schema schema) {
-    super(SchemaUtil.toDataTypes(schema));
-    buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
-    address = UnsafeUtil.getAddress(buffer);
-  }
-
-  @Override
-  public long address() {
-    return address;
-  }
-
-  public void ensureSize(int size) {
-    if (buffer.remaining() - size < 0) { // check the remain size
-      // enlarge new buffer and copy writing data
-      int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
-      ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
-      long newAddress = ((DirectBuffer)newByteBuf).address();
-      UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
-      LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-
-      // release existing buffer and replace variables
-      UnsafeUtil.free(buffer);
-      buffer = newByteBuf;
-      address = newAddress;
-    }
-  }
-
-  @Override
-  public int position() {
-    return 0;
-  }
-
-  @Override
-  public void forward(int length) {
-  }
-
-  @Override
-  public void endRow() {
-    super.endRow();
-    buffer.position(0).limit(offset());
-  }
-
-  @Override
-  public Tuple build() {
-    return buildToHeapTuple();
-  }
-
-  public HeapTuple buildToHeapTuple() {
-    byte [] bytes = new byte[buffer.limit()];
-    UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
-    return new HeapTuple(bytes, dataTypes());
-  }
-
-  public ZeroCopyTuple buildToZeroCopyTuple() {
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
-    return zcTuple;
-  }
-
-  public void release() {
-    UnsafeUtil.free(buffer);
-    buffer = null;
-    address = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
deleted file mode 100644
index be734e1..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-
-public interface RowBlockReader<T extends Tuple> {
-
-  /**
-   * Return for each tuple
-   *
-   * @return True if tuple block is filled with tuples. Otherwise, It will return false.
-   */
-  public boolean next(T tuple);
-
-  public void reset();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
deleted file mode 100644
index c43c018..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.RowWriter;
-
-public interface TupleBuilder extends RowWriter {
-  public Tuple build();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
deleted file mode 100644
index 9662d5a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.UnsafeUtil;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
-  private ByteBuffer bb;
-
-  public DirectBufTuple(int length, DataType[] types) {
-    bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
-    set(bb, 0, length, types);
-  }
-
-  @Override
-  public void release() {
-    UnsafeUtil.free(bb);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
deleted file mode 100644
index a327123..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-/**
- * Fixed size limit specification
- */
-public class FixedSizeLimitSpec extends ResizableLimitSpec {
-  public FixedSizeLimitSpec(long size) {
-    super(size, size);
-  }
-
-  public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
-    super(size, size, allowedOverflowRatio);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
deleted file mode 100644
index 33f9f1c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class HeapTuple implements Tuple {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-  private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
-
-  private final byte [] data;
-  private final DataType [] types;
-
-  public HeapTuple(final byte [] bytes, final DataType [] types) {
-    this.data = bytes;
-    this.types = types;
-  }
-
-  @Override
-  public int size() {
-    return data.length;
-  }
-
-  public ByteBuffer nioBuffer() {
-    return ByteBuffer.wrap(data);
-  }
-
-  private int getFieldOffset(int fieldId) {
-    return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
-  }
-
-  private int checkNullAndGetOffset(int fieldId) {
-    int offset = getFieldOffset(fieldId);
-    if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
-      throw new RuntimeException("Invalid Field Access: " + fieldId);
-    }
-    return offset;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNotNull(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public void clear() {
-    // nothing to do
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
-  }
-
-  @Override
-  public Datum get(int fieldId) {
-    if (isNull(fieldId)) {
-      return NullDatum.get();
-    }
-
-    switch (types[fieldId].getType()) {
-    case BOOLEAN:
-      return DatumFactory.createBool(getBool(fieldId));
-    case INT1:
-    case INT2:
-      return DatumFactory.createInt2(getInt2(fieldId));
-    case INT4:
-      return DatumFactory.createInt4(getInt4(fieldId));
-    case INT8:
-      return DatumFactory.createInt8(getInt4(fieldId));
-    case FLOAT4:
-      return DatumFactory.createFloat4(getFloat4(fieldId));
-    case FLOAT8:
-      return DatumFactory.createFloat8(getFloat8(fieldId));
-    case TEXT:
-      return DatumFactory.createText(getText(fieldId));
-    case TIMESTAMP:
-      return DatumFactory.createTimestamp(getInt8(fieldId));
-    case DATE:
-      return DatumFactory.createDate(getInt4(fieldId));
-    case TIME:
-      return DatumFactory.createTime(getInt8(fieldId));
-    case INTERVAL:
-      return getInterval(fieldId);
-    case INET4:
-      return DatumFactory.createInet4(getInt4(fieldId));
-    case PROTOBUF:
-      return getProtobufDatum(fieldId);
-    default:
-      throw new UnsupportedException("Unknown type: " + types[fieldId]);
-    }
-  }
-
-  @Override
-  public void setOffset(long offset) {
-  }
-
-  @Override
-  public long getOffset() {
-    return 0;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public byte[] getBytes(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return bytes;
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    return new String(getBytes(fieldId));
-  }
-
-  public IntervalDatum getInterval(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-    long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
-    return new IntervalDatum(months, millisecs);
-  }
-
-  @Override
-  public Datum getProtobufDatum(int fieldId) {
-    byte [] bytes = getBytes(fieldId);
-
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
-    Message.Builder builder = factory.newBuilder();
-    try {
-      builder.mergeFrom(bytes);
-    } catch (InvalidProtocolBufferException e) {
-      return NullDatum.get();
-    }
-
-    return new ProtobufDatum(builder.build());
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    return this;
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum [] datums = new Datum[size()];
-    for (int i = 0; i < size(); i++) {
-      if (contains(i)) {
-        datums[i] = get(i);
-      } else {
-        datums[i] = NullDatum.get();
-      }
-    }
-    return datums;
-  }
-
-  @Override
-  public String toString() {
-    return VTuple.toDisplayString(getValues());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
deleted file mode 100644
index 2f8e349..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class OffHeapMemory implements Deallocatable {
-  private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
-
-  protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  protected ByteBuffer buffer;
-  protected int memorySize;
-  protected ResizableLimitSpec limitSpec;
-  protected long address;
-
-  @VisibleForTesting
-  protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
-    this.buffer = buffer;
-    this.address = ((DirectBuffer) buffer).address();
-    this.memorySize = buffer.limit();
-    this.limitSpec = limitSpec;
-  }
-
-  public OffHeapMemory(ResizableLimitSpec limitSpec) {
-    this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
-  }
-
-  public long address() {
-    return address;
-  }
-
-  public long size() {
-    return memorySize;
-  }
-
-  public void resize(int newSize) {
-    Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
-
-    if (newSize > limitSpec.limit()) {
-      throw new RuntimeException("Resize cannot exceed the size limit");
-    }
-
-    if (newSize < memorySize) {
-      LOG.warn("The size reduction is ignored.");
-    }
-
-    int newBlockSize = UnsafeUtil.alignedSize(newSize);
-    ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
-    long newAddress = ((DirectBuffer)newByteBuf).address();
-
-    UNSAFE.copyMemory(this.address, newAddress, memorySize);
-
-    UnsafeUtil.free(buffer);
-    this.memorySize = newSize;
-    this.buffer = newByteBuf;
-    this.address = newAddress;
-  }
-
-  public java.nio.Buffer nioBuffer() {
-    return (ByteBuffer) buffer.position(0).limit(memorySize);
-  }
-
-  @Override
-  public void release() {
-    UnsafeUtil.free(this.buffer);
-    this.buffer = null;
-    this.address = 0;
-    this.memorySize = 0;
-  }
-
-  public String toString() {
-    return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
deleted file mode 100644
index 689efb7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.SizeOf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
-  private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
-
-  public static final int NULL_FIELD_OFFSET = -1;
-
-  DataType [] dataTypes;
-
-  // Basic States
-  private int maxRowNum = Integer.MAX_VALUE; // optional
-  private int rowNum;
-  protected int position = 0;
-
-  private OffHeapRowBlockWriter builder;
-
-  private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
-    super(buffer, limitSpec);
-    initialize(schema);
-  }
-
-  public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
-    super(limitSpec);
-    initialize(schema);
-  }
-
-  private void initialize(Schema schema) {
-    dataTypes = SchemaUtil.toDataTypes(schema);
-
-    this.builder = new OffHeapRowBlockWriter(this);
-  }
-
-  @VisibleForTesting
-  public OffHeapRowBlock(Schema schema, int bytes) {
-    this(schema, new ResizableLimitSpec(bytes));
-  }
-
-  @VisibleForTesting
-  public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
-    this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
-  }
-
-  public void position(int pos) {
-    this.position = pos;
-  }
-
-  public void clear() {
-    this.position = 0;
-    this.rowNum = 0;
-
-    builder.clear();
-  }
-
-  @Override
-  public ByteBuffer nioBuffer() {
-    return (ByteBuffer) buffer.position(0).limit(position);
-  }
-
-  public int position() {
-    return position;
-  }
-
-  public long usedMem() {
-    return position;
-  }
-
-  /**
-   * Ensure that this buffer has enough remaining space to add the size.
-   * Creates and copies to a new buffer if necessary
-   *
-   * @param size Size to add
-   */
-  public void ensureSize(int size) {
-    if (remain() - size < 0) {
-      if (!limitSpec.canIncrease(memorySize)) {
-        throw new RuntimeException("Cannot increase RowBlock anymore.");
-      }
-
-      int newBlockSize = limitSpec.increasedSize(memorySize);
-      resize(newBlockSize);
-      LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-    }
-  }
-
-  public long remain() {
-    return memorySize - position - builder.offset();
-  }
-
-  public int maxRowNum() {
-    return maxRowNum;
-  }
-  public int rows() {
-    return rowNum;
-  }
-
-  public void setRows(int rowNum) {
-    this.rowNum = rowNum;
-  }
-
-  public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
-    if (channel.position() < channel.size()) {
-      clear();
-
-      buffer.clear();
-      channel.read(buffer);
-      memorySize = buffer.position();
-
-      while (position < memorySize) {
-        long recordPtr = address + position;
-
-        if (remain() < SizeOf.SIZE_OF_INT) {
-          channel.position(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        int recordSize = UNSAFE.getInt(recordPtr);
-
-        if (remain() < recordSize) {
-          channel.position(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        position += recordSize;
-        rowNum++;
-      }
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  public RowWriter getWriter() {
-    return builder;
-  }
-
-  public OffHeapRowBlockReader getReader() {
-    return new OffHeapRowBlockReader(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
deleted file mode 100644
index 4a9313f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.tuple.RowBlockReader;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-  OffHeapRowBlock rowBlock;
-
-  // Read States
-  private int curRowIdxForRead;
-  private int curPosForRead;
-
-  public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
-    this.rowBlock = rowBlock;
-  }
-
-  public long remainForRead() {
-    return rowBlock.memorySize - curPosForRead;
-  }
-
-  @Override
-  public boolean next(ZeroCopyTuple tuple) {
-    if (curRowIdxForRead < rowBlock.rows()) {
-
-      long recordStartPtr = rowBlock.address() + curPosForRead;
-      int recordLen = UNSAFE.getInt(recordStartPtr);
-      tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
-
-      curPosForRead += recordLen;
-      curRowIdxForRead++;
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public void reset() {
-    curPosForRead = 0;
-    curRowIdxForRead = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
deleted file mode 100644
index dbc3188..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.collect.Lists;
-import org.apache.tajo.storage.Tuple;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-public class OffHeapRowBlockUtils {
-
-  public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
-    List<Tuple> tupleList = Lists.newArrayList();
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    while(reader.next(zcTuple)) {
-      tupleList.add(zcTuple);
-      zcTuple = new ZeroCopyTuple();
-    }
-    Collections.sort(tupleList, comparator);
-    return tupleList;
-  }
-
-  public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
-    Tuple[] tuples = new Tuple[rowBlock.rows()];
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
-      tuples[i] = zcTuple;
-      zcTuple = new ZeroCopyTuple();
-    }
-    Arrays.sort(tuples, comparator);
-    return tuples;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
deleted file mode 100644
index d177e0c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-
-public class OffHeapRowBlockWriter extends OffHeapRowWriter {
-  OffHeapRowBlock rowBlock;
-
-  OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
-    super(rowBlock.dataTypes);
-    this.rowBlock = rowBlock;
-  }
-
-  public long address() {
-    return rowBlock.address();
-  }
-
-  public int position() {
-    return rowBlock.position();
-  }
-
-  @Override
-  public void forward(int length) {
-    rowBlock.position(position() + length);
-  }
-
-  public void ensureSize(int size) {
-    rowBlock.ensureSize(size);
-  }
-
-  @Override
-  public void endRow() {
-    super.endRow();
-    rowBlock.setRows(rowBlock.rows() + 1);
-  }
-
-  @Override
-  public TajoDataTypes.DataType[] dataTypes() {
-    return rowBlock.dataTypes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
deleted file mode 100644
index 85c7e0b..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-
-/**
- *
- * Row Record Structure
- *
- * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
- *                              4 bytes          4 bytes               4 bytes
- *
- */
-public abstract class OffHeapRowWriter implements RowWriter {
-  /** record size + offset list */
-  private final int headerSize;
-  /** field offsets */
-  private final int [] fieldOffsets;
-  private final TajoDataTypes.DataType [] dataTypes;
-
-  private int curFieldIdx;
-  private int curOffset;
-
-  public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
-    this.dataTypes = dataTypes;
-    fieldOffsets = new int[dataTypes.length];
-    headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
-  }
-
-  public void clear() {
-    curOffset = 0;
-    curFieldIdx = 0;
-  }
-
-  public long recordStartAddr() {
-    return address() + position();
-  }
-
-  public abstract long address();
-
-  public abstract void ensureSize(int size);
-
-  public int offset() {
-    return curOffset;
-  }
-
-  /**
-   * Current position
-   *
-   * @return The position
-   */
-  public abstract int position();
-
-  /**
-   * Forward the address;
-   *
-   * @param length Length to be forwarded
-   */
-  public abstract void forward(int length);
-
-  @Override
-  public TajoDataTypes.DataType[] dataTypes() {
-    return dataTypes;
-  }
-
-  public boolean startRow() {
-    curOffset = headerSize;
-    curFieldIdx = 0;
-    return true;
-  }
-
-  public void endRow() {
-    long rowHeaderPos = address() + position();
-    OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
-    rowHeaderPos += SizeOf.SIZE_OF_INT;
-
-    for (int i = 0; i < curFieldIdx; i++) {
-      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
-      rowHeaderPos += SizeOf.SIZE_OF_INT;
-    }
-    for (int i = curFieldIdx; i < dataTypes.length; i++) {
-      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
-      rowHeaderPos += SizeOf.SIZE_OF_INT;
-    }
-
-    // rowOffset is equivalent to a byte length of this row.
-    forward(curOffset);
-  }
-
-  public void skipField() {
-    fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  private void forwardField() {
-    fieldOffsets[curFieldIdx++] = curOffset;
-  }
-
-  public void putBool(boolean val) {
-    ensureSize(SizeOf.SIZE_OF_BOOL);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
-
-    curOffset += SizeOf.SIZE_OF_BOOL;
-  }
-
-  public void putInt2(short val) {
-    ensureSize(SizeOf.SIZE_OF_SHORT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_SHORT;
-  }
-
-  public void putInt4(int val) {
-    ensureSize(SizeOf.SIZE_OF_INT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_INT;
-  }
-
-  public void putInt8(long val) {
-    ensureSize(SizeOf.SIZE_OF_LONG);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_LONG;
-  }
-
-  public void putFloat4(float val) {
-    ensureSize(SizeOf.SIZE_OF_FLOAT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_FLOAT;
-  }
-
-  public void putFloat8(double val) {
-    ensureSize(SizeOf.SIZE_OF_DOUBLE);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_DOUBLE;
-  }
-
-  public void putText(String val) {
-    byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
-    putText(bytes);
-  }
-
-  public void putText(byte[] val) {
-    int bytesLen = val.length;
-
-    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
-    curOffset += SizeOf.SIZE_OF_INT;
-
-    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
-        recordStartAddr() + curOffset, bytesLen);
-    curOffset += bytesLen;
-  }
-
-  public void putBlob(byte[] val) {
-    int bytesLen = val.length;
-
-    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
-    curOffset += SizeOf.SIZE_OF_INT;
-
-    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
-        recordStartAddr() + curOffset, bytesLen);
-    curOffset += bytesLen;
-  }
-
-  public void putTimestamp(long val) {
-    putInt8(val);
-  }
-
-  public void putDate(int val) {
-    putInt4(val);
-  }
-
-  public void putTime(long val) {
-    putInt8(val);
-  }
-
-  public void putInterval(IntervalDatum val) {
-    ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
-    forwardField();
-
-    long offset = recordStartAddr() + curOffset;
-    OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
-    offset += SizeOf.SIZE_OF_INT;
-    OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
-    curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
-  }
-
-  public void putInet4(int val) {
-    putInt4(val);
-  }
-
-  public void putProtoDatum(ProtobufDatum val) {
-    putBlob(val.asByteArray());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
deleted file mode 100644
index 14e67b2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.FileUtil;
-
-/**
- * It specifies the maximum size or increasing ratio. In addition,
- * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
- * due to ByteBuffer.
- */
-public class ResizableLimitSpec {
-  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
-
-  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
-  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
-
-  private final long initSize;
-  private final long limitBytes;
-  private final float incRatio;
-  private final float allowedOVerflowRatio;
-  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
-  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
-
-  public ResizableLimitSpec(long initSize) {
-    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes) {
-    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
-    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
-    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
-    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
-    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
-    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
-    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
-
-    if (initSize == limitBytes) {
-      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
-
-      if (overflowedSize > Integer.MAX_VALUE) {
-        overflowedSize = Integer.MAX_VALUE;
-      }
-
-      this.initSize = overflowedSize;
-      this.limitBytes = overflowedSize;
-    } else {
-      this.initSize = initSize;
-      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
-
-      if (limitBytes > Integer.MAX_VALUE) {
-        this.limitBytes = Integer.MAX_VALUE;
-      } else {
-        this.limitBytes = limitBytes;
-      }
-    }
-
-    this.allowedOVerflowRatio = allowedOverflowRatio;
-    this.incRatio = incRatio;
-  }
-
-  public long initialSize() {
-    return initSize;
-  }
-
-  public long limit() {
-    return limitBytes;
-  }
-
-  public float remainRatio(long currentSize) {
-    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
-    if (currentSize > Integer.MAX_VALUE) {
-      currentSize = Integer.MAX_VALUE;
-    }
-    return (float)currentSize / (float)limitBytes;
-  }
-
-  public boolean canIncrease(long currentSize) {
-    return remain(currentSize) > 0;
-  }
-
-  public long remain(long currentSize) {
-    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
-    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
-  }
-
-  public int increasedSize(int currentSize) {
-    if (currentSize < initSize) {
-      return (int) initSize;
-    }
-
-    if (currentSize > Integer.MAX_VALUE) {
-      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
-      return Integer.MAX_VALUE;
-    }
-    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
-
-    if (nextSize > limitBytes) {
-      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
-      nextSize = limitBytes;
-    }
-
-    if (nextSize > Integer.MAX_VALUE) {
-      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
-      nextSize = Integer.MAX_VALUE;
-    }
-
-    return (int) nextSize;
-  }
-
-  @Override
-  public String toString() {
-    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
-        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
-        + ",inc_ratio=" + incRatio;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
deleted file mode 100644
index a2b2561..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-
-/**
- * The call sequence should be as follows:
- *
- * <pre>
- *   startRow() -->  skipField() or putXXX --> endRow()
- * </pre>
- *
- * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
- */
-public interface RowWriter {
-
-  public TajoDataTypes.DataType [] dataTypes();
-
-  public boolean startRow();
-
-  public void endRow();
-
-  public void skipField();
-
-  public void putBool(boolean val);
-
-  public void putInt2(short val);
-
-  public void putInt4(int val);
-
-  public void putInt8(long val);
-
-  public void putFloat4(float val);
-
-  public void putFloat8(double val);
-
-  public void putText(String val);
-
-  public void putText(byte[] val);
-
-  public void putBlob(byte[] val);
-
-  public void putTimestamp(long val);
-
-  public void putTime(long val);
-
-  public void putDate(int val);
-
-  public void putInterval(IntervalDatum val);
-
-  public void putInet4(int val);
-
-  public void putProtoDatum(ProtobufDatum datum);
-}