You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:34 UTC

[19/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
deleted file mode 100644
index 0fb2c3a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.*;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.schema.MessageType;
-
-/**
- * Read records from a Parquet file.
- */
-public class ParquetReader<T> implements Closeable {
-
-  private ReadSupport<T> readSupport;
-  private UnboundRecordFilter filter;
-  private Configuration conf;
-  private ReadContext readContext;
-  private Iterator<Footer> footersIterator;
-  private InternalParquetRecordReader<T> reader;
-  private GlobalMetaData globalMetaData;
-
-  /**
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @throws IOException
-   */
-  public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
-    this(file, readSupport, null);
-  }
-
-  /**
-   * @param conf the configuration
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @throws IOException
-   */
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
-    this(conf, file, readSupport, null);
-  }
-
-  /**
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
-   * @throws IOException
-   */
-  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
-    this(new Configuration(), file, readSupport, filter);
-  }
-
-  /**
-   * @param conf the configuration
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
-   * @throws IOException
-   */
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
-    this.readSupport = readSupport;
-    this.filter = filter;
-    this.conf = conf;
-
-    FileSystem fs = file.getFileSystem(conf);
-    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
-    this.footersIterator = footers.iterator();
-    globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
-
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-    for (Footer footer : footers) {
-      blocks.addAll(footer.getParquetMetadata().getBlocks());
-    }
-
-    MessageType schema = globalMetaData.getSchema();
-    Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
-    readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
-  }
-
-  /**
-   * @return the next record or null if finished
-   * @throws IOException
-   */
-  public T read() throws IOException {
-    try {
-      if (reader != null && reader.nextKeyValue()) {
-        return reader.getCurrentValue();
-      } else {
-        initReader();
-        return reader == null ? null : read();
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private void initReader() throws IOException {
-    if (reader != null) {
-      reader.close();
-      reader = null;
-    }
-    if (footersIterator.hasNext()) {
-      Footer footer = footersIterator.next();
-      reader = new InternalParquetRecordReader<T>(readSupport, filter);
-      reader.initialize(
-          readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
-          readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
deleted file mode 100644
index 0447a47..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.column.ParquetProperties;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class ParquetWriter<T> implements Closeable {
-
-  public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
-  public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
-  public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
-      CompressionCodecName.UNCOMPRESSED;
-  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
-  public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
-  public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
-      ParquetProperties.WriterVersion.PARQUET_1_0;
-
-  private final InternalParquetRecordWriter<T> writer;
-
-  /**
-   * Create a new ParquetWriter.
-   * (with dictionary encoding enabled and validation off)
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @throws java.io.IOException
-   * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean)
-   */
-  public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
-        DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold (both data and dictionary)
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @throws IOException
-   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      boolean enableDictionary,
-      boolean validating) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @throws IOException
-   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
-        dictionaryPageSize, enableDictionary, validating,
-        DEFAULT_WRITER_VERSION);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
-   * configuration from the classpath.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
-   * @throws IOException
-   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      ParquetProperties.WriterVersion writerVersion) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
-   * @param conf Hadoop configuration to use while accessing the filesystem
-   * @throws IOException
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      ParquetProperties.WriterVersion writerVersion,
-      Configuration conf) throws IOException {
-
-    WriteSupport.WriteContext writeContext = writeSupport.init(conf);
-    MessageType schema = writeContext.getSchema();
-
-    ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
-    fileWriter.start();
-
-    CodecFactory codecFactory = new CodecFactory(conf);
-    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName, 0);
-    this.writer = new InternalParquetRecordWriter<T>(
-        fileWriter,
-        writeSupport,
-        schema,
-        writeContext.getExtraMetaData(),
-        blockSize,
-        pageSize,
-        compressor,
-        dictionaryPageSize,
-        enableDictionary,
-        validating,
-        writerVersion);
-  }
-
-  /**
-   * Create a new ParquetWriter.  The default block size is 50 MB.The default
-   * page size is 1 MB.  Default compression is no compression. Dictionary encoding is disabled.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a RecordConsumer
-   * @throws IOException
-   */
-  public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
-    this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
-  }
-
-  public void write(T object) throws IOException {
-    try {
-      writer.write(object);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public long getEstimatedWrittenSize() throws IOException {
-    return this.writer.getEstimatedWrittenSize();
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      writer.close();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
deleted file mode 100644
index c1835df..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.HeapTuple;
-import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
-import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
-  private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
-
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  // buffer
-  private ByteBuffer buffer;
-  private long address;
-
-  public BaseTupleBuilder(Schema schema) {
-    super(SchemaUtil.toDataTypes(schema));
-    buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
-    address = UnsafeUtil.getAddress(buffer);
-  }
-
-  @Override
-  public long address() {
-    return address;
-  }
-
-  public void ensureSize(int size) {
-    if (buffer.remaining() - size < 0) { // check the remain size
-      // enlarge new buffer and copy writing data
-      int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
-      ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
-      long newAddress = ((DirectBuffer)newByteBuf).address();
-      UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
-      LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-
-      // release existing buffer and replace variables
-      UnsafeUtil.free(buffer);
-      buffer = newByteBuf;
-      address = newAddress;
-    }
-  }
-
-  @Override
-  public int position() {
-    return 0;
-  }
-
-  @Override
-  public void forward(int length) {
-  }
-
-  @Override
-  public void endRow() {
-    super.endRow();
-    buffer.position(0).limit(offset());
-  }
-
-  @Override
-  public Tuple build() {
-    return buildToHeapTuple();
-  }
-
-  public HeapTuple buildToHeapTuple() {
-    byte [] bytes = new byte[buffer.limit()];
-    UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
-    return new HeapTuple(bytes, dataTypes());
-  }
-
-  public ZeroCopyTuple buildToZeroCopyTuple() {
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
-    return zcTuple;
-  }
-
-  public void release() {
-    UnsafeUtil.free(buffer);
-    buffer = null;
-    address = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
deleted file mode 100644
index be734e1..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-
-public interface RowBlockReader<T extends Tuple> {
-
-  /**
-   * Return for each tuple
-   *
-   * @return True if tuple block is filled with tuples. Otherwise, It will return false.
-   */
-  public boolean next(T tuple);
-
-  public void reset();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
deleted file mode 100644
index c43c018..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.RowWriter;
-
-public interface TupleBuilder extends RowWriter {
-  public Tuple build();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
deleted file mode 100644
index 9662d5a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.UnsafeUtil;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
-  private ByteBuffer bb;
-
-  public DirectBufTuple(int length, DataType[] types) {
-    bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
-    set(bb, 0, length, types);
-  }
-
-  @Override
-  public void release() {
-    UnsafeUtil.free(bb);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
deleted file mode 100644
index a327123..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-/**
- * Fixed size limit specification
- */
-public class FixedSizeLimitSpec extends ResizableLimitSpec {
-  public FixedSizeLimitSpec(long size) {
-    super(size, size);
-  }
-
-  public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
-    super(size, size, allowedOverflowRatio);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
deleted file mode 100644
index 33f9f1c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class HeapTuple implements Tuple {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-  private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
-
-  private final byte [] data;
-  private final DataType [] types;
-
-  public HeapTuple(final byte [] bytes, final DataType [] types) {
-    this.data = bytes;
-    this.types = types;
-  }
-
-  @Override
-  public int size() {
-    return data.length;
-  }
-
-  public ByteBuffer nioBuffer() {
-    return ByteBuffer.wrap(data);
-  }
-
-  private int getFieldOffset(int fieldId) {
-    return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
-  }
-
-  private int checkNullAndGetOffset(int fieldId) {
-    int offset = getFieldOffset(fieldId);
-    if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
-      throw new RuntimeException("Invalid Field Access: " + fieldId);
-    }
-    return offset;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNotNull(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public void clear() {
-    // nothing to do
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
-  }
-
-  @Override
-  public Datum get(int fieldId) {
-    if (isNull(fieldId)) {
-      return NullDatum.get();
-    }
-
-    switch (types[fieldId].getType()) {
-    case BOOLEAN:
-      return DatumFactory.createBool(getBool(fieldId));
-    case INT1:
-    case INT2:
-      return DatumFactory.createInt2(getInt2(fieldId));
-    case INT4:
-      return DatumFactory.createInt4(getInt4(fieldId));
-    case INT8:
-      return DatumFactory.createInt8(getInt4(fieldId));
-    case FLOAT4:
-      return DatumFactory.createFloat4(getFloat4(fieldId));
-    case FLOAT8:
-      return DatumFactory.createFloat8(getFloat8(fieldId));
-    case TEXT:
-      return DatumFactory.createText(getText(fieldId));
-    case TIMESTAMP:
-      return DatumFactory.createTimestamp(getInt8(fieldId));
-    case DATE:
-      return DatumFactory.createDate(getInt4(fieldId));
-    case TIME:
-      return DatumFactory.createTime(getInt8(fieldId));
-    case INTERVAL:
-      return getInterval(fieldId);
-    case INET4:
-      return DatumFactory.createInet4(getInt4(fieldId));
-    case PROTOBUF:
-      return getProtobufDatum(fieldId);
-    default:
-      throw new UnsupportedException("Unknown type: " + types[fieldId]);
-    }
-  }
-
-  @Override
-  public void setOffset(long offset) {
-  }
-
-  @Override
-  public long getOffset() {
-    return 0;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public byte[] getBytes(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return bytes;
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    return new String(getBytes(fieldId));
-  }
-
-  public IntervalDatum getInterval(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-    long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
-    return new IntervalDatum(months, millisecs);
-  }
-
-  @Override
-  public Datum getProtobufDatum(int fieldId) {
-    byte [] bytes = getBytes(fieldId);
-
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
-    Message.Builder builder = factory.newBuilder();
-    try {
-      builder.mergeFrom(bytes);
-    } catch (InvalidProtocolBufferException e) {
-      return NullDatum.get();
-    }
-
-    return new ProtobufDatum(builder.build());
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    return this;
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum [] datums = new Datum[size()];
-    for (int i = 0; i < size(); i++) {
-      if (contains(i)) {
-        datums[i] = get(i);
-      } else {
-        datums[i] = NullDatum.get();
-      }
-    }
-    return datums;
-  }
-
-  @Override
-  public String toString() {
-    return VTuple.toDisplayString(getValues());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
deleted file mode 100644
index 2f8e349..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class OffHeapMemory implements Deallocatable {
-  private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
-
-  protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  protected ByteBuffer buffer;
-  protected int memorySize;
-  protected ResizableLimitSpec limitSpec;
-  protected long address;
-
-  @VisibleForTesting
-  protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
-    this.buffer = buffer;
-    this.address = ((DirectBuffer) buffer).address();
-    this.memorySize = buffer.limit();
-    this.limitSpec = limitSpec;
-  }
-
-  public OffHeapMemory(ResizableLimitSpec limitSpec) {
-    this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
-  }
-
-  public long address() {
-    return address;
-  }
-
-  public long size() {
-    return memorySize;
-  }
-
-  public void resize(int newSize) {
-    Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
-
-    if (newSize > limitSpec.limit()) {
-      throw new RuntimeException("Resize cannot exceed the size limit");
-    }
-
-    if (newSize < memorySize) {
-      LOG.warn("The size reduction is ignored.");
-    }
-
-    int newBlockSize = UnsafeUtil.alignedSize(newSize);
-    ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
-    long newAddress = ((DirectBuffer)newByteBuf).address();
-
-    UNSAFE.copyMemory(this.address, newAddress, memorySize);
-
-    UnsafeUtil.free(buffer);
-    this.memorySize = newSize;
-    this.buffer = newByteBuf;
-    this.address = newAddress;
-  }
-
-  public java.nio.Buffer nioBuffer() {
-    return (ByteBuffer) buffer.position(0).limit(memorySize);
-  }
-
-  @Override
-  public void release() {
-    UnsafeUtil.free(this.buffer);
-    this.buffer = null;
-    this.address = 0;
-    this.memorySize = 0;
-  }
-
-  public String toString() {
-    return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
deleted file mode 100644
index 689efb7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.SizeOf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
-  private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
-
-  public static final int NULL_FIELD_OFFSET = -1;
-
-  DataType [] dataTypes;
-
-  // Basic States
-  private int maxRowNum = Integer.MAX_VALUE; // optional
-  private int rowNum;
-  protected int position = 0;
-
-  private OffHeapRowBlockWriter builder;
-
-  private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
-    super(buffer, limitSpec);
-    initialize(schema);
-  }
-
-  public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
-    super(limitSpec);
-    initialize(schema);
-  }
-
-  private void initialize(Schema schema) {
-    dataTypes = SchemaUtil.toDataTypes(schema);
-
-    this.builder = new OffHeapRowBlockWriter(this);
-  }
-
-  @VisibleForTesting
-  public OffHeapRowBlock(Schema schema, int bytes) {
-    this(schema, new ResizableLimitSpec(bytes));
-  }
-
-  @VisibleForTesting
-  public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
-    this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
-  }
-
-  public void position(int pos) {
-    this.position = pos;
-  }
-
-  public void clear() {
-    this.position = 0;
-    this.rowNum = 0;
-
-    builder.clear();
-  }
-
-  @Override
-  public ByteBuffer nioBuffer() {
-    return (ByteBuffer) buffer.position(0).limit(position);
-  }
-
-  public int position() {
-    return position;
-  }
-
-  public long usedMem() {
-    return position;
-  }
-
-  /**
-   * Ensure that this buffer has enough remaining space to add the size.
-   * Creates and copies to a new buffer if necessary
-   *
-   * @param size Size to add
-   */
-  public void ensureSize(int size) {
-    if (remain() - size < 0) {
-      if (!limitSpec.canIncrease(memorySize)) {
-        throw new RuntimeException("Cannot increase RowBlock anymore.");
-      }
-
-      int newBlockSize = limitSpec.increasedSize(memorySize);
-      resize(newBlockSize);
-      LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-    }
-  }
-
-  public long remain() {
-    return memorySize - position - builder.offset();
-  }
-
-  public int maxRowNum() {
-    return maxRowNum;
-  }
-  public int rows() {
-    return rowNum;
-  }
-
-  public void setRows(int rowNum) {
-    this.rowNum = rowNum;
-  }
-
-  public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
-    if (channel.position() < channel.size()) {
-      clear();
-
-      buffer.clear();
-      channel.read(buffer);
-      memorySize = buffer.position();
-
-      while (position < memorySize) {
-        long recordPtr = address + position;
-
-        if (remain() < SizeOf.SIZE_OF_INT) {
-          channel.position(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        int recordSize = UNSAFE.getInt(recordPtr);
-
-        if (remain() < recordSize) {
-          channel.position(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        position += recordSize;
-        rowNum++;
-      }
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  public RowWriter getWriter() {
-    return builder;
-  }
-
-  public OffHeapRowBlockReader getReader() {
-    return new OffHeapRowBlockReader(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
deleted file mode 100644
index 4a9313f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.tuple.RowBlockReader;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-  OffHeapRowBlock rowBlock;
-
-  // Read States
-  private int curRowIdxForRead;
-  private int curPosForRead;
-
-  public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
-    this.rowBlock = rowBlock;
-  }
-
-  public long remainForRead() {
-    return rowBlock.memorySize - curPosForRead;
-  }
-
-  @Override
-  public boolean next(ZeroCopyTuple tuple) {
-    if (curRowIdxForRead < rowBlock.rows()) {
-
-      long recordStartPtr = rowBlock.address() + curPosForRead;
-      int recordLen = UNSAFE.getInt(recordStartPtr);
-      tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
-
-      curPosForRead += recordLen;
-      curRowIdxForRead++;
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public void reset() {
-    curPosForRead = 0;
-    curRowIdxForRead = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
deleted file mode 100644
index dbc3188..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.collect.Lists;
-import org.apache.tajo.storage.Tuple;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-public class OffHeapRowBlockUtils {
-
-  public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
-    List<Tuple> tupleList = Lists.newArrayList();
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    while(reader.next(zcTuple)) {
-      tupleList.add(zcTuple);
-      zcTuple = new ZeroCopyTuple();
-    }
-    Collections.sort(tupleList, comparator);
-    return tupleList;
-  }
-
-  public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
-    Tuple[] tuples = new Tuple[rowBlock.rows()];
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
-      tuples[i] = zcTuple;
-      zcTuple = new ZeroCopyTuple();
-    }
-    Arrays.sort(tuples, comparator);
-    return tuples;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
deleted file mode 100644
index d177e0c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-
-public class OffHeapRowBlockWriter extends OffHeapRowWriter {
-  OffHeapRowBlock rowBlock;
-
-  OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
-    super(rowBlock.dataTypes);
-    this.rowBlock = rowBlock;
-  }
-
-  public long address() {
-    return rowBlock.address();
-  }
-
-  public int position() {
-    return rowBlock.position();
-  }
-
-  @Override
-  public void forward(int length) {
-    rowBlock.position(position() + length);
-  }
-
-  public void ensureSize(int size) {
-    rowBlock.ensureSize(size);
-  }
-
-  @Override
-  public void endRow() {
-    super.endRow();
-    rowBlock.setRows(rowBlock.rows() + 1);
-  }
-
-  @Override
-  public TajoDataTypes.DataType[] dataTypes() {
-    return rowBlock.dataTypes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
deleted file mode 100644
index 85c7e0b..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-
-/**
- *
- * Row Record Structure
- *
- * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
- *                              4 bytes          4 bytes               4 bytes
- *
- */
-public abstract class OffHeapRowWriter implements RowWriter {
-  /** record size + offset list */
-  private final int headerSize;
-  /** field offsets */
-  private final int [] fieldOffsets;
-  private final TajoDataTypes.DataType [] dataTypes;
-
-  private int curFieldIdx;
-  private int curOffset;
-
-  public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
-    this.dataTypes = dataTypes;
-    fieldOffsets = new int[dataTypes.length];
-    headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
-  }
-
-  public void clear() {
-    curOffset = 0;
-    curFieldIdx = 0;
-  }
-
-  public long recordStartAddr() {
-    return address() + position();
-  }
-
-  public abstract long address();
-
-  public abstract void ensureSize(int size);
-
-  public int offset() {
-    return curOffset;
-  }
-
-  /**
-   * Current position
-   *
-   * @return The position
-   */
-  public abstract int position();
-
-  /**
-   * Forward the address;
-   *
-   * @param length Length to be forwarded
-   */
-  public abstract void forward(int length);
-
-  @Override
-  public TajoDataTypes.DataType[] dataTypes() {
-    return dataTypes;
-  }
-
-  public boolean startRow() {
-    curOffset = headerSize;
-    curFieldIdx = 0;
-    return true;
-  }
-
-  public void endRow() {
-    long rowHeaderPos = address() + position();
-    OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
-    rowHeaderPos += SizeOf.SIZE_OF_INT;
-
-    for (int i = 0; i < curFieldIdx; i++) {
-      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
-      rowHeaderPos += SizeOf.SIZE_OF_INT;
-    }
-    for (int i = curFieldIdx; i < dataTypes.length; i++) {
-      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
-      rowHeaderPos += SizeOf.SIZE_OF_INT;
-    }
-
-    // rowOffset is equivalent to a byte length of this row.
-    forward(curOffset);
-  }
-
-  public void skipField() {
-    fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  private void forwardField() {
-    fieldOffsets[curFieldIdx++] = curOffset;
-  }
-
-  public void putBool(boolean val) {
-    ensureSize(SizeOf.SIZE_OF_BOOL);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
-
-    curOffset += SizeOf.SIZE_OF_BOOL;
-  }
-
-  public void putInt2(short val) {
-    ensureSize(SizeOf.SIZE_OF_SHORT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_SHORT;
-  }
-
-  public void putInt4(int val) {
-    ensureSize(SizeOf.SIZE_OF_INT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_INT;
-  }
-
-  public void putInt8(long val) {
-    ensureSize(SizeOf.SIZE_OF_LONG);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_LONG;
-  }
-
-  public void putFloat4(float val) {
-    ensureSize(SizeOf.SIZE_OF_FLOAT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_FLOAT;
-  }
-
-  public void putFloat8(double val) {
-    ensureSize(SizeOf.SIZE_OF_DOUBLE);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_DOUBLE;
-  }
-
-  public void putText(String val) {
-    byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
-    putText(bytes);
-  }
-
-  public void putText(byte[] val) {
-    int bytesLen = val.length;
-
-    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
-    curOffset += SizeOf.SIZE_OF_INT;
-
-    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
-        recordStartAddr() + curOffset, bytesLen);
-    curOffset += bytesLen;
-  }
-
-  public void putBlob(byte[] val) {
-    int bytesLen = val.length;
-
-    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
-    curOffset += SizeOf.SIZE_OF_INT;
-
-    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
-        recordStartAddr() + curOffset, bytesLen);
-    curOffset += bytesLen;
-  }
-
-  public void putTimestamp(long val) {
-    putInt8(val);
-  }
-
-  public void putDate(int val) {
-    putInt4(val);
-  }
-
-  public void putTime(long val) {
-    putInt8(val);
-  }
-
-  public void putInterval(IntervalDatum val) {
-    ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
-    forwardField();
-
-    long offset = recordStartAddr() + curOffset;
-    OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
-    offset += SizeOf.SIZE_OF_INT;
-    OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
-    curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
-  }
-
-  public void putInet4(int val) {
-    putInt4(val);
-  }
-
-  public void putProtoDatum(ProtobufDatum val) {
-    putBlob(val.asByteArray());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
deleted file mode 100644
index 14e67b2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.FileUtil;
-
-/**
- * It specifies the maximum size or increasing ratio. In addition,
- * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
- * due to ByteBuffer.
- */
-public class ResizableLimitSpec {
-  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
-
-  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
-  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
-
-  private final long initSize;
-  private final long limitBytes;
-  private final float incRatio;
-  private final float allowedOVerflowRatio;
-  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
-  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
-
-  public ResizableLimitSpec(long initSize) {
-    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes) {
-    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
-    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
-    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
-    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
-    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
-    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
-    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
-
-    if (initSize == limitBytes) {
-      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
-
-      if (overflowedSize > Integer.MAX_VALUE) {
-        overflowedSize = Integer.MAX_VALUE;
-      }
-
-      this.initSize = overflowedSize;
-      this.limitBytes = overflowedSize;
-    } else {
-      this.initSize = initSize;
-      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
-
-      if (limitBytes > Integer.MAX_VALUE) {
-        this.limitBytes = Integer.MAX_VALUE;
-      } else {
-        this.limitBytes = limitBytes;
-      }
-    }
-
-    this.allowedOVerflowRatio = allowedOverflowRatio;
-    this.incRatio = incRatio;
-  }
-
-  public long initialSize() {
-    return initSize;
-  }
-
-  public long limit() {
-    return limitBytes;
-  }
-
-  public float remainRatio(long currentSize) {
-    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
-    if (currentSize > Integer.MAX_VALUE) {
-      currentSize = Integer.MAX_VALUE;
-    }
-    return (float)currentSize / (float)limitBytes;
-  }
-
-  public boolean canIncrease(long currentSize) {
-    return remain(currentSize) > 0;
-  }
-
-  public long remain(long currentSize) {
-    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
-    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
-  }
-
-  public int increasedSize(int currentSize) {
-    if (currentSize < initSize) {
-      return (int) initSize;
-    }
-
-    if (currentSize > Integer.MAX_VALUE) {
-      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
-      return Integer.MAX_VALUE;
-    }
-    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
-
-    if (nextSize > limitBytes) {
-      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
-      nextSize = limitBytes;
-    }
-
-    if (nextSize > Integer.MAX_VALUE) {
-      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
-      nextSize = Integer.MAX_VALUE;
-    }
-
-    return (int) nextSize;
-  }
-
-  @Override
-  public String toString() {
-    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
-        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
-        + ",inc_ratio=" + incRatio;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
deleted file mode 100644
index a2b2561..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-
-/**
- * The call sequence should be as follows:
- *
- * <pre>
- *   startRow() -->  skipField() or putXXX --> endRow()
- * </pre>
- *
- * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
- */
-public interface RowWriter {
-
-  public TajoDataTypes.DataType [] dataTypes();
-
-  public boolean startRow();
-
-  public void endRow();
-
-  public void skipField();
-
-  public void putBool(boolean val);
-
-  public void putInt2(short val);
-
-  public void putInt4(int val);
-
-  public void putInt8(long val);
-
-  public void putFloat4(float val);
-
-  public void putFloat8(double val);
-
-  public void putText(String val);
-
-  public void putText(byte[] val);
-
-  public void putBlob(byte[] val);
-
-  public void putTimestamp(long val);
-
-  public void putTime(long val);
-
-  public void putDate(int val);
-
-  public void putInterval(IntervalDatum val);
-
-  public void putInet4(int val);
-
-  public void putProtoDatum(ProtobufDatum datum);
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
deleted file mode 100644
index b742e6d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public abstract class UnSafeTuple implements Tuple {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  private DirectBuffer bb;
-  private int relativePos;
-  private int length;
-  private DataType [] types;
-
-  protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
-    this.bb = (DirectBuffer) bb;
-    this.relativePos = relativePos;
-    this.length = length;
-    this.types = types;
-  }
-
-  void set(ByteBuffer bb, DataType [] types) {
-    set(bb, 0, bb.limit(), types);
-  }
-
-  @Override
-  public int size() {
-    return types.length;
-  }
-
-  public ByteBuffer nioBuffer() {
-    return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
-  }
-
-  public HeapTuple toHeapTuple() {
-    byte [] bytes = new byte[length];
-    UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
-    return new HeapTuple(bytes, types);
-  }
-
-  public void copyFrom(UnSafeTuple tuple) {
-    Preconditions.checkNotNull(tuple);
-
-    ((ByteBuffer) bb).clear();
-    if (length < tuple.length) {
-      UnsafeUtil.free((ByteBuffer) bb);
-      bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
-      this.relativePos = 0;
-      this.length = tuple.length;
-    }
-
-    ((ByteBuffer) bb).put(tuple.nioBuffer());
-  }
-
-  private int getFieldOffset(int fieldId) {
-    return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
-  }
-
-  public long getFieldAddr(int fieldId) {
-    int fieldOffset = getFieldOffset(fieldId);
-    if (fieldOffset == -1) {
-      throw new RuntimeException("Invalid Field Access: " + fieldId);
-    }
-    return bb.address() + relativePos + fieldOffset;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNotNull(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public void clear() {
-    // nothing to do
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
-  }
-
-  @Override
-  public Datum get(int fieldId) {
-    if (isNull(fieldId)) {
-      return NullDatum.get();
-    }
-
-    switch (types[fieldId].getType()) {
-    case BOOLEAN:
-      return DatumFactory.createBool(getBool(fieldId));
-    case INT1:
-    case INT2:
-      return DatumFactory.createInt2(getInt2(fieldId));
-    case INT4:
-      return DatumFactory.createInt4(getInt4(fieldId));
-    case INT8:
-      return DatumFactory.createInt8(getInt4(fieldId));
-    case FLOAT4:
-      return DatumFactory.createFloat4(getFloat4(fieldId));
-    case FLOAT8:
-      return DatumFactory.createFloat8(getFloat8(fieldId));
-    case TEXT:
-      return DatumFactory.createText(getText(fieldId));
-    case TIMESTAMP:
-      return DatumFactory.createTimestamp(getInt8(fieldId));
-    case DATE:
-      return DatumFactory.createDate(getInt4(fieldId));
-    case TIME:
-      return DatumFactory.createTime(getInt8(fieldId));
-    case INTERVAL:
-      return getInterval(fieldId);
-    case INET4:
-      return DatumFactory.createInet4(getInt4(fieldId));
-    case PROTOBUF:
-      return getProtobufDatum(fieldId);
-    default:
-      throw new UnsupportedException("Unknown type: " + types[fieldId]);
-    }
-  }
-
-  @Override
-  public void setOffset(long offset) {
-  }
-
-  @Override
-  public long getOffset() {
-    return 0;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return UNSAFE.getByte(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return UNSAFE.getChar(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public byte[] getBytes(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return bytes;
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    long addr = getFieldAddr(fieldId);
-    return UNSAFE.getShort(addr);
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return UNSAFE.getInt(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return UNSAFE.getLong(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return UNSAFE.getFloat(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return UNSAFE.getDouble(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return new String(bytes);
-  }
-
-  public IntervalDatum getInterval(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int months = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-    long millisecs = UNSAFE.getLong(pos);
-    return new IntervalDatum(months, millisecs);
-  }
-
-  @Override
-  public Datum getProtobufDatum(int fieldId) {
-    byte [] bytes = getBytes(fieldId);
-
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
-    Message.Builder builder = factory.newBuilder();
-    try {
-      builder.mergeFrom(bytes);
-    } catch (InvalidProtocolBufferException e) {
-      return NullDatum.get();
-    }
-
-    return new ProtobufDatum(builder.build());
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    return toHeapTuple();
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum [] datums = new Datum[size()];
-    for (int i = 0; i < size(); i++) {
-      if (contains(i)) {
-        datums[i] = get(i);
-      } else {
-        datums[i] = NullDatum.get();
-      }
-    }
-    return datums;
-  }
-
-  @Override
-  public String toString() {
-    return VTuple.toDisplayString(getValues());
-  }
-
-  public abstract void release();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
deleted file mode 100644
index 73e1e2f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedLongs;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-import java.nio.ByteOrder;
-
-/**
- * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
- */
-public class UnSafeTupleBytesComparator {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  static final boolean littleEndian =
-      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
-  public static int compare(long ptr1, long ptr2) {
-    int lstrLen = UNSAFE.getInt(ptr1);
-    int rstrLen = UNSAFE.getInt(ptr2);
-
-    ptr1 += SizeOf.SIZE_OF_INT;
-    ptr2 += SizeOf.SIZE_OF_INT;
-
-    int minLength = Math.min(lstrLen, rstrLen);
-    int minWords = minLength / Longs.BYTES;
-
-        /*
-         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
-         * time is no slower than comparing 4 bytes at a time even on 32-bit.
-         * On the other hand, it is substantially faster on 64-bit.
-         */
-    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
-      long lw = UNSAFE.getLong(ptr1);
-      long rw = UNSAFE.getLong(ptr2);
-      long diff = lw ^ rw;
-
-      if (diff != 0) {
-        if (!littleEndian) {
-          return UnsignedLongs.compare(lw, rw);
-        }
-
-        // Use binary search
-        int n = 0;
-        int y;
-        int x = (int) diff;
-        if (x == 0) {
-          x = (int) (diff >>> 32);
-          n = 32;
-        }
-
-        y = x << 16;
-        if (y == 0) {
-          n += 16;
-        } else {
-          x = y;
-        }
-
-        y = x << 8;
-        if (y == 0) {
-          n += 8;
-        }
-        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
-      }
-
-      ptr1 += SizeOf.SIZE_OF_LONG;
-      ptr2 += SizeOf.SIZE_OF_LONG;
-    }
-
-    // The epilogue to cover the last (minLength % 8) elements.
-    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
-      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
-      if (result != 0) {
-        return result;
-      }
-    }
-    return lstrLen - rstrLen;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
deleted file mode 100644
index 51dbb29..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class ZeroCopyTuple extends UnSafeTuple {
-
-  public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
-    super.set(bb, relativePos, length, types);
-  }
-
-  @Override
-  public void release() {
-    // nothing to do
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
deleted file mode 100644
index f5c8a08..0000000
--- a/tajo-storage/src/main/proto/IndexProtos.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.index";
-option java_outer_classname = "IndexProtos";
-option optimize_for = SPEED;
-option java_generic_services = false;
-option java_generate_equals_and_hash = true;
-
-import "CatalogProtos.proto";
-
-message TupleComparatorProto {
-  required SchemaProto schema = 1;
-  repeated SortSpecProto sortSpecs = 2;
-  repeated TupleComparatorSpecProto compSpecs = 3;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/src/main/proto/StorageFragmentProtos.proto
deleted file mode 100644
index dd79d74..0000000
--- a/tajo-storage/src/main/proto/StorageFragmentProtos.proto
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.storage.fragment";
-option java_outer_classname = "StorageFragmentProtos";
-option optimize_for = SPEED;
-option java_generic_services = false;
-option java_generate_equals_and_hash = true;
-
-import "CatalogProtos.proto";
-
-message HBaseFragmentProto {
-  required string tableName = 1;
-  required string hbaseTableName = 2;
-  required bytes startRow = 3;
-  required bytes stopRow = 4;
-  required bool last = 5;
-  required int64 length = 6;
-  optional string regionLocation = 7;
-}
\ No newline at end of file