You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:39 UTC

[25/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
deleted file mode 100644
index b742e6d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public abstract class UnSafeTuple implements Tuple {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  private DirectBuffer bb;
-  private int relativePos;
-  private int length;
-  private DataType [] types;
-
-  protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
-    this.bb = (DirectBuffer) bb;
-    this.relativePos = relativePos;
-    this.length = length;
-    this.types = types;
-  }
-
-  void set(ByteBuffer bb, DataType [] types) {
-    set(bb, 0, bb.limit(), types);
-  }
-
-  @Override
-  public int size() {
-    return types.length;
-  }
-
-  public ByteBuffer nioBuffer() {
-    return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
-  }
-
-  public HeapTuple toHeapTuple() {
-    byte [] bytes = new byte[length];
-    UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
-    return new HeapTuple(bytes, types);
-  }
-
-  public void copyFrom(UnSafeTuple tuple) {
-    Preconditions.checkNotNull(tuple);
-
-    ((ByteBuffer) bb).clear();
-    if (length < tuple.length) {
-      UnsafeUtil.free((ByteBuffer) bb);
-      bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
-      this.relativePos = 0;
-      this.length = tuple.length;
-    }
-
-    ((ByteBuffer) bb).put(tuple.nioBuffer());
-  }
-
-  private int getFieldOffset(int fieldId) {
-    return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
-  }
-
-  public long getFieldAddr(int fieldId) {
-    int fieldOffset = getFieldOffset(fieldId);
-    if (fieldOffset == -1) {
-      throw new RuntimeException("Invalid Field Access: " + fieldId);
-    }
-    return bb.address() + relativePos + fieldOffset;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isNotNull(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public void clear() {
-    // nothing to do
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
-  }
-
-  @Override
-  public Datum get(int fieldId) {
-    if (isNull(fieldId)) {
-      return NullDatum.get();
-    }
-
-    switch (types[fieldId].getType()) {
-    case BOOLEAN:
-      return DatumFactory.createBool(getBool(fieldId));
-    case INT1:
-    case INT2:
-      return DatumFactory.createInt2(getInt2(fieldId));
-    case INT4:
-      return DatumFactory.createInt4(getInt4(fieldId));
-    case INT8:
-      return DatumFactory.createInt8(getInt4(fieldId));
-    case FLOAT4:
-      return DatumFactory.createFloat4(getFloat4(fieldId));
-    case FLOAT8:
-      return DatumFactory.createFloat8(getFloat8(fieldId));
-    case TEXT:
-      return DatumFactory.createText(getText(fieldId));
-    case TIMESTAMP:
-      return DatumFactory.createTimestamp(getInt8(fieldId));
-    case DATE:
-      return DatumFactory.createDate(getInt4(fieldId));
-    case TIME:
-      return DatumFactory.createTime(getInt8(fieldId));
-    case INTERVAL:
-      return getInterval(fieldId);
-    case INET4:
-      return DatumFactory.createInet4(getInt4(fieldId));
-    case PROTOBUF:
-      return getProtobufDatum(fieldId);
-    default:
-      throw new UnsupportedException("Unknown type: " + types[fieldId]);
-    }
-  }
-
-  @Override
-  public void setOffset(long offset) {
-  }
-
-  @Override
-  public long getOffset() {
-    return 0;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return UNSAFE.getByte(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return UNSAFE.getChar(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public byte[] getBytes(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return bytes;
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    long addr = getFieldAddr(fieldId);
-    return UNSAFE.getShort(addr);
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return UNSAFE.getInt(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return UNSAFE.getLong(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return UNSAFE.getFloat(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return UNSAFE.getDouble(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return new String(bytes);
-  }
-
-  public IntervalDatum getInterval(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int months = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-    long millisecs = UNSAFE.getLong(pos);
-    return new IntervalDatum(months, millisecs);
-  }
-
-  @Override
-  public Datum getProtobufDatum(int fieldId) {
-    byte [] bytes = getBytes(fieldId);
-
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
-    Message.Builder builder = factory.newBuilder();
-    try {
-      builder.mergeFrom(bytes);
-    } catch (InvalidProtocolBufferException e) {
-      return NullDatum.get();
-    }
-
-    return new ProtobufDatum(builder.build());
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    return toHeapTuple();
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum [] datums = new Datum[size()];
-    for (int i = 0; i < size(); i++) {
-      if (contains(i)) {
-        datums[i] = get(i);
-      } else {
-        datums[i] = NullDatum.get();
-      }
-    }
-    return datums;
-  }
-
-  @Override
-  public String toString() {
-    return VTuple.toDisplayString(getValues());
-  }
-
-  public abstract void release();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
deleted file mode 100644
index 73e1e2f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedLongs;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-import java.nio.ByteOrder;
-
-/**
- * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
- */
-public class UnSafeTupleBytesComparator {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  static final boolean littleEndian =
-      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
-  public static int compare(long ptr1, long ptr2) {
-    int lstrLen = UNSAFE.getInt(ptr1);
-    int rstrLen = UNSAFE.getInt(ptr2);
-
-    ptr1 += SizeOf.SIZE_OF_INT;
-    ptr2 += SizeOf.SIZE_OF_INT;
-
-    int minLength = Math.min(lstrLen, rstrLen);
-    int minWords = minLength / Longs.BYTES;
-
-        /*
-         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
-         * time is no slower than comparing 4 bytes at a time even on 32-bit.
-         * On the other hand, it is substantially faster on 64-bit.
-         */
-    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
-      long lw = UNSAFE.getLong(ptr1);
-      long rw = UNSAFE.getLong(ptr2);
-      long diff = lw ^ rw;
-
-      if (diff != 0) {
-        if (!littleEndian) {
-          return UnsignedLongs.compare(lw, rw);
-        }
-
-        // Use binary search
-        int n = 0;
-        int y;
-        int x = (int) diff;
-        if (x == 0) {
-          x = (int) (diff >>> 32);
-          n = 32;
-        }
-
-        y = x << 16;
-        if (y == 0) {
-          n += 16;
-        } else {
-          x = y;
-        }
-
-        y = x << 8;
-        if (y == 0) {
-          n += 8;
-        }
-        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
-      }
-
-      ptr1 += SizeOf.SIZE_OF_LONG;
-      ptr2 += SizeOf.SIZE_OF_LONG;
-    }
-
-    // The epilogue to cover the last (minLength % 8) elements.
-    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
-      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
-      if (result != 0) {
-        return result;
-      }
-    }
-    return lstrLen - rstrLen;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
deleted file mode 100644
index 51dbb29..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class ZeroCopyTuple extends UnSafeTuple {
-
-  public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
-    super.set(bb, relativePos, length, types);
-  }
-
-  @Override
-  public void release() {
-    // nothing to do
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
deleted file mode 100644
index f5c8a08..0000000
--- a/tajo-storage/src/main/proto/IndexProtos.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.index";
-option java_outer_classname = "IndexProtos";
-option optimize_for = SPEED;
-option java_generic_services = false;
-option java_generate_equals_and_hash = true;
-
-import "CatalogProtos.proto";
-
-message TupleComparatorProto {
-  required SchemaProto schema = 1;
-  repeated SortSpecProto sortSpecs = 2;
-  repeated TupleComparatorSpecProto compSpecs = 3;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
deleted file mode 100644
index e861b7d..0000000
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<configuration>
-  <property>
-    <name>tajo.storage.manager.maxReadBytes</name>
-    <value>8388608</value>
-    <description></description>
-  </property>
-
-  <property>
-    <name>tajo.storage.manager.concurrency.perDisk</name>
-    <value>1</value>
-    <description></description>
-  </property>
-
-  <!--- Registered Scanner Handler -->
-  <property>
-    <name>tajo.storage.scanner-handler</name>
-    <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
-  </property>
-
-  <!--- Fragment Class Configurations -->
-  <property>
-    <name>tajo.storage.fragment.textfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.csv.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.json.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.raw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.rcfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.row.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.parquet.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.sequencefile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.avro.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-
-  <!--- Scanner Handler -->
-  <property>
-    <name>tajo.storage.scanner-handler.textfile.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.csv.class</name>
-    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.json.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.rcfile.class</name>
-    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroScanner</value>
-  </property>
-
-  <!--- Appender Handler -->
-  <property>
-    <name>tajo.storage.appender-handler</name>
-    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.textfile.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.csv.class</name>
-    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.json.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.rcfile.class</name>
-    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroAppender</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
deleted file mode 100644
index cf8a54e..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpFileServer {
-  private final static Log LOG = LogFactory.getLog(HttpFileServer.class);
-
-  private final InetSocketAddress addr;
-  private InetSocketAddress bindAddr;
-  private ServerBootstrap bootstrap = null;
-  private ChannelFactory factory = null;
-  private ChannelGroup channelGroup = null;
-
-  public HttpFileServer(final InetSocketAddress addr) {
-    this.addr = addr;
-    this.factory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
-        2);
-
-    // Configure the server.
-    this.bootstrap = new ServerBootstrap(factory);
-    // Set up the event pipeline factory.
-    this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
-    this.channelGroup = new DefaultChannelGroup();
-  }
-
-  public HttpFileServer(String bindaddr) {
-    this(NetUtils.createSocketAddr(bindaddr));
-  }
-
-  public void start() {
-    // Bind and start to accept incoming connections.
-    Channel channel = bootstrap.bind(addr);
-    channelGroup.add(channel);    
-    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
-    LOG.info("HttpFileServer starts up ("
-        + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
-        + ")");
-  }
-  
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddr;
-  }
-
-  public void stop() {
-    ChannelGroupFuture future = channelGroup.close();
-    future.awaitUninterruptibly();
-    factory.releaseExternalResources();
-
-    LOG.info("HttpFileServer shutdown ("
-        + this.bindAddr.getAddress().getHostAddress() + ":"
-        + this.bindAddr.getPort() + ")");
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
deleted file mode 100644
index 6c77317..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.RandomAccessFile;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-/**
- * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
- */
-public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    HttpRequest request = (HttpRequest) e.getMessage();
-    if (request.getMethod() != GET) {
-      sendError(ctx, METHOD_NOT_ALLOWED);
-      return;
-    }
-
-    final String path = sanitizeUri(request.getUri());
-    if (path == null) {
-      sendError(ctx, FORBIDDEN);
-      return;
-    }
-
-    File file = new File(path);
-    if (file.isHidden() || !file.exists()) {
-      sendError(ctx, NOT_FOUND);
-      return;
-    }
-    if (!file.isFile()) {
-      sendError(ctx, FORBIDDEN);
-      return;
-    }
-
-    RandomAccessFile raf;
-    try {
-      raf = new RandomAccessFile(file, "r");
-    } catch (FileNotFoundException fnfe) {
-      sendError(ctx, NOT_FOUND);
-      return;
-    }
-    long fileLength = raf.length();
-
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-    setContentLength(response, fileLength);
-    setContentTypeHeader(response);
-
-    Channel ch = e.getChannel();
-
-    // Write the initial line and the header.
-    ch.write(response);
-
-    // Write the content.
-    ChannelFuture writeFuture;
-    if (ch.getPipeline().get(SslHandler.class) != null) {
-      // Cannot use zero-copy with HTTPS.
-      writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
-    } else {
-      // No encryption - use zero-copy.
-      final FileRegion region =
-          new DefaultFileRegion(raf.getChannel(), 0, fileLength);
-      writeFuture = ch.write(region);
-      writeFuture.addListener(new ChannelFutureProgressListener() {
-        public void operationComplete(ChannelFuture future) {
-          region.releaseExternalResources();
-        }
-
-        public void operationProgressed(
-            ChannelFuture future, long amount, long current, long total) {
-          System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
-        }
-      });
-    }
-
-    // Decide whether to close the connection or not.
-    if (!isKeepAlive(request)) {
-      // Close the connection when the whole content is written out.
-      writeFuture.addListener(ChannelFutureListener.CLOSE);
-    }
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-      throws Exception {
-    Channel ch = e.getChannel();
-    Throwable cause = e.getCause();
-    if (cause instanceof TooLongFrameException) {
-      sendError(ctx, BAD_REQUEST);
-      return;
-    }
-
-    cause.printStackTrace();
-    if (ch.isConnected()) {
-      sendError(ctx, INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  private static String sanitizeUri(String uri) {
-    // Decode the path.
-    try {
-      uri = URLDecoder.decode(uri, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      try {
-        uri = URLDecoder.decode(uri, "ISO-8859-1");
-      } catch (UnsupportedEncodingException e1) {
-        throw new Error();
-      }
-    }
-
-    // Convert file separators.
-    uri = uri.replace('/', File.separatorChar);
-
-    // Simplistic dumb security check.
-    // You will have to do something serious in the production environment.
-    if (uri.contains(File.separator + '.') ||
-        uri.contains('.' + File.separator) ||
-        uri.startsWith(".") || uri.endsWith(".")) {
-      return null;
-    }
-
-    return uri;
-  }
-
-  private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-    response.setContent(ChannelBuffers.copiedBuffer(
-        "Failure: " + status.toString() + "\r\n",
-        CharsetUtil.UTF_8));
-
-    // Close the connection as soon as the error message is sent.
-    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-  }
-
-  /**
-   * Sets the content type header for the HTTP Response
-   *
-   * @param response
-   *            HTTP response
-   */
-  private static void setContentTypeHeader(HttpResponse response) {
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
deleted file mode 100644
index cecf93b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-// Uncomment the following lines if you want HTTPS
-//import javax.net.ssl.SSLEngine;
-//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
-//import org.jboss.netty.handler.ssl.SslHandler;
-
-//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
-public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
-  public ChannelPipeline getPipeline() throws Exception {
-    // Create a default pipeline implementation.
-    ChannelPipeline pipeline = pipeline();
-
-    // Uncomment the following lines if you want HTTPS
-    //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
-    //engine.setUseClientMode(false);
-    //pipeline.addLast("ssl", new SslHandler(engine));
-
-    pipeline.addLast("decoder", new HttpRequestDecoder());
-    pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
-    pipeline.addLast("encoder", new HttpResponseEncoder());
-    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-
-    pipeline.addLast("handler", new HttpFileServerHandler());
-    return pipeline;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
deleted file mode 100644
index fd5a63e..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestCompressionStorages {
-  private TajoConf conf;
-  private static String TEST_PATH = "target/test-data/TestCompressionStorages";
-
-  private StoreType storeType;
-  private Path testDir;
-  private FileSystem fs;
-
-  public TestCompressionStorages(StoreType type) throws IOException {
-    this.storeType = type;
-    conf = new TajoConf();
-
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][]{
-        {StoreType.CSV},
-        {StoreType.RCFILE},
-        {StoreType.SEQUENCEFILE},
-        {StoreType.TEXTFILE}
-    });
-  }
-
-  @Test
-  public void testDeflateCodecCompressionData() throws IOException {
-    storageCompressionTest(storeType, DeflateCodec.class);
-  }
-
-  @Test
-  public void testGzipCodecCompressionData() throws IOException {
-    if (storeType == StoreType.RCFILE) {
-      if( ZlibFactory.isNativeZlibLoaded(conf)) {
-        storageCompressionTest(storeType, GzipCodec.class);
-      }
-    } else if (storeType == StoreType.SEQUENCEFILE) {
-      if( ZlibFactory.isNativeZlibLoaded(conf)) {
-        storageCompressionTest(storeType, GzipCodec.class);
-      }
-    } else {
-      storageCompressionTest(storeType, GzipCodec.class);
-    }
-  }
-
-  @Test
-  public void testSnappyCodecCompressionData() throws IOException {
-    if (SnappyCodec.isNativeCodeLoaded()) {
-      storageCompressionTest(storeType, SnappyCodec.class);
-    }
-  }
-
-  @Test
-  public void testLz4CodecCompressionData() throws IOException {
-    if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
-    storageCompressionTest(storeType, Lz4Codec.class);
-  }
-
-  private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.FLOAT4);
-    schema.addColumn("name", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(storeType);
-    meta.putOption("compression.codec", codec.getCanonicalName());
-    meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name());
-    meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
-    meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName());
-
-    String fileName = "Compression_" + codec.getSimpleName();
-    Path tablePath = new Path(testDir, fileName);
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
-    appender.enableStats();
-
-    appender.init();
-
-    String extension = "";
-    if (appender instanceof CSVFile.CSVAppender) {
-      extension = ((CSVFile.CSVAppender) appender).getExtension();
-    } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) {
-      extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
-    }
-
-    int tupleNum = 100000;
-    VTuple vTuple;
-
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(3);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createFloat4((float) i));
-      vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    TableStats stat = appender.getStats();
-    assertEquals(tupleNum, stat.getNumRows().longValue());
-    tablePath = tablePath.suffix(extension);
-    FileStatus status = fs.getFileStatus(tablePath);
-    long fileLen = status.getLen();
-    FileFragment[] tablets = new FileFragment[1];
-    tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
-
-    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
-
-    if (StoreType.CSV == storeType) {
-      if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
-        assertTrue(scanner.isSplittable());
-      } else {
-        assertFalse(scanner.isSplittable());
-      }
-    }
-    scanner.init();
-
-    if (storeType == StoreType.SEQUENCEFILE) {
-      assertTrue(scanner instanceof SequenceFileScanner);
-      Writable key = ((SequenceFileScanner) scanner).getKey();
-      assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
-    }
-
-    int tupleCnt = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
-      tupleCnt++;
-    }
-    scanner.close();
-    assertEquals(tupleNum, tupleCnt);
-    assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
-    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
deleted file mode 100644
index 93fb12b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.FileUtil;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-
-import static org.junit.Assert.*;
-
-public class TestDelimitedTextFile {
-
-  private static Schema schema = new Schema();
-
-  private static Tuple baseTuple = new VTuple(10);
-
-  static {
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.CHAR, 7);
-    schema.addColumn("col3", Type.INT2);
-    schema.addColumn("col4", Type.INT4);
-    schema.addColumn("col5", Type.INT8);
-    schema.addColumn("col6", Type.FLOAT4);
-    schema.addColumn("col7", Type.FLOAT8);
-    schema.addColumn("col8", Type.TEXT);
-    schema.addColumn("col9", Type.BLOB);
-    schema.addColumn("col10", Type.INET4);
-
-    baseTuple.put(new Datum[] {
-        DatumFactory.createBool(true),                // 0
-        DatumFactory.createChar("hyunsik"),           // 1
-        DatumFactory.createInt2((short) 17),          // 2
-        DatumFactory.createInt4(59),                  // 3
-        DatumFactory.createInt8(23l),                 // 4
-        DatumFactory.createFloat4(77.9f),             // 5
-        DatumFactory.createFloat8(271.9d),            // 6
-        DatumFactory.createText("hyunsik"),           // 7
-        DatumFactory.createBlob("hyunsik".getBytes()),// 8
-        DatumFactory.createInet4("192.168.0.1"),      // 9
-    });
-  }
-
-  public static Path getResourcePath(String path, String suffix) {
-    URL resultBaseURL = ClassLoader.getSystemResource(path);
-    return new Path(resultBaseURL.toString(), suffix);
-  }
-
-  public static Path getResultPath(Class clazz, String fileName) {
-    return new Path (getResourcePath("results", clazz.getSimpleName()), fileName);
-  }
-
-  public static String getResultText(Class clazz, String fileName) throws IOException {
-    FileSystem localFS = FileSystem.getLocal(new Configuration());
-    Path path = getResultPath(clazz, fileName);
-    Preconditions.checkState(localFS.exists(path) && localFS.isFile(path));
-    return FileUtil.readTextFile(new File(path.toUri()));
-  }
-
-  private static final FileFragment getFileFragment(String fileName) throws IOException {
-    TajoConf conf = new TajoConf();
-    Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName);
-    FileSystem fs = FileSystem.getLocal(conf);
-    FileStatus status = fs.getFileStatus(tablePath);
-    return new FileFragment("table", tablePath, 0, status.getLen());
-  }
-
-  @Test
-  public void testIgnoreAllErrors() throws IOException {
-    TajoConf conf = new TajoConf();
-
-    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
-    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
-    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
-    scanner.init();
-
-    Tuple tuple;
-    int i = 0;
-    while ((tuple = scanner.next()) != null) {
-      assertEquals(baseTuple, tuple);
-      i++;
-    }
-    assertEquals(3, i);
-    scanner.close();
-  }
-
-  @Test
-  public void testIgnoreOneErrorTolerance() throws IOException {
-
-
-    TajoConf conf = new TajoConf();
-
-    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
-    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
-    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
-    scanner.init();
-
-    assertNotNull(scanner.next());
-    assertNotNull(scanner.next());
-    try {
-      scanner.next();
-    } catch (IOException ioe) {
-      System.out.println(ioe);
-      return;
-    } finally {
-      scanner.close();
-    }
-    fail();
-  }
-
-  @Test
-  public void testNoErrorTolerance() throws IOException {
-    TajoConf conf = new TajoConf();
-    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
-    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
-    FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
-    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
-    scanner.init();
-
-    try {
-      scanner.next();
-    } catch (IOException ioe) {
-      return;
-    } finally {
-      scanner.close();
-    }
-    fail();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
deleted file mode 100644
index bec0daf..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.??See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.??The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.??You may obtain a copy of the License at
- *
- *?????http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestFileSystems {
-
-  private static String TEST_PATH = "target/test-data/TestFileSystem";
-  private Configuration conf;
-  private StorageManager sm;
-  private FileSystem fs;
-  private Path testDir;
-
-  public TestFileSystems(FileSystem fs) throws IOException {
-    this.fs = fs;
-    this.conf = fs.getConf();
-    this.testDir = getTestDir(this.fs, TEST_PATH);
-    this.sm = StorageManager.getStorageManager(new TajoConf(this.conf));
-  }
-
-  public Path getTestDir(FileSystem fs, String dir) throws IOException {
-    Path path = new Path(dir);
-    if (fs.exists(path))
-      fs.delete(path, true);
-
-    fs.mkdirs(path);
-
-    return fs.makeQualified(path);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> generateParameters() throws IOException {
-    return Arrays.asList(new Object[][]{
-        {FileSystem.getLocal(new TajoConf())},
-    });
-  }
-
-  @Before
-  public void setup() throws IOException {
-    if (!(fs instanceof LocalFileSystem)) {
-      conf.set("fs.local.block.size", "10");
-      fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
-      fs.setConf(conf);
-    }
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (!(fs instanceof LocalFileSystem)) {
-      fs.setConf(new TajoConf());
-    }
-  }
-
-  @Test
-  public void testBlockSplit() throws IOException {
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
-    Tuple[] tuples = new Tuple[4];
-    for (int i = 0; i < tuples.length; i++) {
-      tuples[i] = new VTuple(3);
-      tuples[i]
-          .put(new Datum[]{DatumFactory.createInt4(i),
-              DatumFactory.createInt4(i + 32),
-              DatumFactory.createText("name" + i)});
-    }
-
-    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
-        "table.csv");
-    fs.mkdirs(path.getParent());
-
-    Appender appender = sm.getAppender(meta, schema, path);
-    appender.init();
-    for (Tuple t : tuples) {
-      appender.addTuple(t);
-    }
-    appender.close();
-    FileStatus fileStatus = fs.getFileStatus(path);
-
-    List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
-    int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
-    assertEquals(splitSize, splits.size());
-
-    for (FileFragment fragment : splits) {
-      assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
deleted file mode 100644
index 387fed5..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestFrameTuple {
-  private Tuple tuple1;
-  private Tuple tuple2;
-
-  @Before
-  public void setUp() throws Exception {
-    tuple1 = new VTuple(11);
-    tuple1.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
-        DatumFactory.createChar('9'),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("hyunsik"),
-        DatumFactory.createBlob("hyunsik".getBytes()),
-        DatumFactory.createInet4("192.168.0.1")
-    });
-    
-    tuple2 = new VTuple(11);
-    tuple2.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
-        DatumFactory.createChar('9'),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("hyunsik"),
-        DatumFactory.createBlob("hyunsik".getBytes()),
-        DatumFactory.createInet4("192.168.0.1")
-    });
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @Test
-  public final void testFrameTuple() {
-    Tuple frame = new FrameTuple(tuple1, tuple2);
-    assertEquals(22, frame.size());
-    for (int i = 0; i < 22; i++) {
-      assertTrue(frame.contains(i));
-    }
-    
-    assertEquals(DatumFactory.createInt8(23l), frame.get(5));
-    assertEquals(DatumFactory.createInt8(23l), frame.get(16));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
deleted file mode 100644
index c6149f7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.util.BytesUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestLazyTuple {
-
-  Schema schema;
-  byte[][] textRow;
-  byte[] nullbytes;
-  SerializerDeserializer serde;
-
-  @Before
-  public void setUp() {
-    nullbytes = "\\N".getBytes();
-
-    schema = new Schema();
-    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
-    schema.addColumn("col2", TajoDataTypes.Type.BIT);
-    schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
-    schema.addColumn("col4", TajoDataTypes.Type.INT2);
-    schema.addColumn("col5", TajoDataTypes.Type.INT4);
-    schema.addColumn("col6", TajoDataTypes.Type.INT8);
-    schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
-    schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
-    schema.addColumn("col9", TajoDataTypes.Type.TEXT);
-    schema.addColumn("col10", TajoDataTypes.Type.BLOB);
-    schema.addColumn("col11", TajoDataTypes.Type.INET4);
-    schema.addColumn("col12", TajoDataTypes.Type.INT4);
-    schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
-
-    StringBuilder sb = new StringBuilder();
-    sb.append(DatumFactory.createBool(true)).append('|');
-    sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
-    sb.append(DatumFactory.createChar("str")).append('|');
-    sb.append(DatumFactory.createInt2((short) 17)).append('|');
-    sb.append(DatumFactory.createInt4(59)).append('|');
-    sb.append(DatumFactory.createInt8(23l)).append('|');
-    sb.append(DatumFactory.createFloat4(77.9f)).append('|');
-    sb.append(DatumFactory.createFloat8(271.9f)).append('|');
-    sb.append(DatumFactory.createText("str2")).append('|');
-    sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
-    sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
-    sb.append(new String(nullbytes)).append('|');
-    sb.append(NullDatum.get());
-    textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
-    serde = new TextSerializerDeserializer();
-  }
-
-  @Test
-  public void testGetDatum() {
-
-    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
-    assertEquals(DatumFactory.createBool(true), t1.get(0));
-    assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
-    assertEquals(DatumFactory.createChar("str"), t1.get(2));
-    assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
-    assertEquals(DatumFactory.createInt4(59), t1.get(4));
-    assertEquals(DatumFactory.createInt8(23l), t1.get(5));
-    assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
-    assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
-    assertEquals(DatumFactory.createText("str2"), t1.get(8));
-    assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
-    assertEquals(NullDatum.get(), t1.get(11));
-    assertEquals(NullDatum.get(), t1.get(12));
-  }
-
-  @Test
-  public void testContain() {
-    int colNum = schema.size();
-
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(3, DatumFactory.createInt4(1));
-    t1.put(7, DatumFactory.createInt4(1));
-
-    assertTrue(t1.contains(0));
-    assertFalse(t1.contains(1));
-    assertFalse(t1.contains(2));
-    assertTrue(t1.contains(3));
-    assertFalse(t1.contains(4));
-    assertFalse(t1.contains(5));
-    assertFalse(t1.contains(6));
-    assertTrue(t1.contains(7));
-    assertFalse(t1.contains(8));
-    assertFalse(t1.contains(9));
-    assertFalse(t1.contains(10));
-    assertFalse(t1.contains(11));
-    assertFalse(t1.contains(12));
-  }
-
-  @Test
-  public void testPut() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    t1.put(0, DatumFactory.createText("str"));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(11, DatumFactory.createFloat4(0.76f));
-
-    assertTrue(t1.contains(0));
-    assertTrue(t1.contains(1));
-
-    assertEquals(t1.getText(0), "str");
-    assertEquals(t1.get(1).asInt4(), 2);
-    assertTrue(t1.get(11).asFloat4() == 0.76f);
-  }
-
-  @Test
-  public void testEquals() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(3, DatumFactory.createInt4(2));
-
-    t2.put(0, DatumFactory.createInt4(1));
-    t2.put(1, DatumFactory.createInt4(2));
-    t2.put(3, DatumFactory.createInt4(2));
-
-    assertEquals(t1, t2);
-
-    Tuple t3 = new VTuple(colNum);
-    t3.put(0, DatumFactory.createInt4(1));
-    t3.put(1, DatumFactory.createInt4(2));
-    t3.put(3, DatumFactory.createInt4(2));
-    assertEquals(t1, t3);
-    assertEquals(t2, t3);
-
-    LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
-    assertNotSame(t1, t4);
-  }
-
-  @Test
-  public void testHashCode() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(3, DatumFactory.createInt4(2));
-    t1.put(4, DatumFactory.createText("str"));
-
-    t2.put(0, DatumFactory.createInt4(1));
-    t2.put(1, DatumFactory.createInt4(2));
-    t2.put(3, DatumFactory.createInt4(2));
-    t2.put(4, DatumFactory.createText("str"));
-
-    assertEquals(t1.hashCode(), t2.hashCode());
-
-    Tuple t3 = new VTuple(colNum);
-    t3.put(0, DatumFactory.createInt4(1));
-    t3.put(1, DatumFactory.createInt4(2));
-    t3.put(3, DatumFactory.createInt4(2));
-    t3.put(4, DatumFactory.createText("str"));
-    assertEquals(t1.hashCode(), t3.hashCode());
-    assertEquals(t2.hashCode(), t3.hashCode());
-
-    Tuple t4 = new VTuple(5);
-    t4.put(0, DatumFactory.createInt4(1));
-    t4.put(1, DatumFactory.createInt4(2));
-    t4.put(4, DatumFactory.createInt4(2));
-
-    assertNotSame(t1.hashCode(), t4.hashCode());
-  }
-
-  @Test
-  public void testPutTuple() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(2, DatumFactory.createInt4(3));
-
-
-    Schema schema2 = new Schema();
-    schema2.addColumn("col1", TajoDataTypes.Type.INT8);
-    schema2.addColumn("col2", TajoDataTypes.Type.INT8);
-
-    LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
-    t2.put(0, DatumFactory.createInt4(4));
-    t2.put(1, DatumFactory.createInt4(5));
-
-    t1.put(3, t2);
-
-    for (int i = 0; i < 5; i++) {
-      assertEquals(i + 1, t1.get(i).asInt4());
-    }
-  }
-
-  @Test
-  public void testInvalidNumber() {
-    byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
-    Schema schema = new Schema();
-    schema.addColumn("col1", TajoDataTypes.Type.INT2);
-    schema.addColumn("col2", TajoDataTypes.Type.INT4);
-    schema.addColumn("col3", TajoDataTypes.Type.INT8);
-    schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
-    schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
-
-    LazyTuple tuple = new LazyTuple(schema, bytes, 0);
-    assertEquals(bytes.length, tuple.size());
-
-    for (int i = 0; i < tuple.size(); i++){
-      assertEquals(NullDatum.get(), tuple.get(i));
-    }
-  }
-
-  @Test
-  public void testClone() throws CloneNotSupportedException {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(3, DatumFactory.createInt4(2));
-    t1.put(4, DatumFactory.createText("str"));
-
-    LazyTuple t2 = (LazyTuple) t1.clone();
-    assertNotSame(t1, t2);
-    assertEquals(t1, t2);
-
-    assertSame(t1.get(4), t2.get(4));
-
-    t1.clear();
-    assertFalse(t1.equals(t2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
deleted file mode 100644
index bfaba04..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.text.ByteBufLineReader;
-import org.apache.tajo.storage.text.DelimitedLineReader;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.FileUtil;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-
-public class TestLineReader {
-	private static String TEST_PATH = "target/test-data/TestLineReader";
-
-  @Test
-  public void testByteBufLineReader() throws IOException {
-    TajoConf conf = new TajoConf();
-    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    FileSystem fs = testDir.getFileSystem(conf);
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT8);
-    schema.addColumn("comment", Type.TEXT);
-    schema.addColumn("comment2", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
-    Path tablePath = new Path(testDir, "line.data");
-    FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
-        tablePath);
-    appender.enableStats();
-    appender.init();
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(25l));
-      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
-      vTuple.put(3, NullDatum.get());
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    FileStatus status = fs.getFileStatus(tablePath);
-
-    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
-    ByteBufLineReader reader = new ByteBufLineReader(channel);
-
-    long totalRead = 0;
-    int i = 0;
-    AtomicInteger bytes = new AtomicInteger();
-    for(;;){
-      ByteBuf buf = reader.readLineBuf(bytes);
-      totalRead += bytes.get();
-      if(buf == null) break;
-      i++;
-    }
-    IOUtils.cleanup(null, reader, channel, fs);
-    assertEquals(tupleNum, i);
-    assertEquals(status.getLen(), totalRead);
-    assertEquals(status.getLen(), reader.readBytes());
-  }
-
-  @Test
-  public void testLineDelimitedReader() throws IOException {
-    TajoConf conf = new TajoConf();
-    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    FileSystem fs = testDir.getFileSystem(conf);
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT8);
-    schema.addColumn("comment", Type.TEXT);
-    schema.addColumn("comment2", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
-    meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
-
-    Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
-    FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
-        tablePath);
-    appender.enableStats();
-    appender.init();
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    long splitOffset = 0;
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(25l));
-      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
-      vTuple.put(3, NullDatum.get());
-      appender.addTuple(vTuple);
-
-      if(i == (tupleNum / 2)){
-        splitOffset = appender.getOffset();
-      }
-    }
-    String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
-    appender.close();
-
-    tablePath = tablePath.suffix(extension);
-    FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
-    DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
-    assertTrue(reader.isCompressed());
-    assertFalse(reader.isReadable());
-    reader.init();
-    assertTrue(reader.isReadable());
-
-
-    int i = 0;
-    while(reader.isReadable()){
-      ByteBuf buf = reader.readLine();
-      if(buf == null) break;
-      i++;
-    }
-
-    IOUtils.cleanup(null, reader, fs);
-    assertEquals(tupleNum, i);
-
-  }
-
-  @Test
-  public void testByteBufLineReaderWithoutTerminating() throws IOException {
-    String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
-    File file = new File(path);
-    String data = FileUtil.readTextFile(file);
-
-    ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
-    ByteBufLineReader reader = new ByteBufLineReader(channel);
-
-    long totalRead = 0;
-    int i = 0;
-    AtomicInteger bytes = new AtomicInteger();
-    for(;;){
-      ByteBuf buf = reader.readLineBuf(bytes);
-      totalRead += bytes.get();
-      if(buf == null) break;
-      i++;
-    }
-    IOUtils.cleanup(null, reader);
-    assertEquals(file.length(), totalRead);
-    assertEquals(file.length(), reader.readBytes());
-    assertEquals(data.split("\n").length, i);
-  }
-
-  @Test
-  public void testCRLFLine() throws IOException {
-    TajoConf conf = new TajoConf();
-    Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt");
-
-    FileSystem fs = testFile.getFileSystem(conf);
-    FSDataOutputStream outputStream = fs.create(testFile, true);
-    outputStream.write("0\r\n1\r\n".getBytes());
-    outputStream.flush();
-    IOUtils.closeStream(outputStream);
-
-    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile));
-    ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2));
-    FileStatus status = fs.getFileStatus(testFile);
-
-    long totalRead = 0;
-    int i = 0;
-    AtomicInteger bytes = new AtomicInteger();
-    for(;;){
-      ByteBuf buf = reader.readLineBuf(bytes);
-      totalRead += bytes.get();
-      if(buf == null) break;
-      String row  = buf.toString(Charset.defaultCharset());
-      assertEquals(i, Integer.parseInt(row));
-      i++;
-    }
-    IOUtils.cleanup(null, reader);
-    assertEquals(status.getLen(), totalRead);
-    assertEquals(status.getLen(), reader.readBytes());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
deleted file mode 100644
index e6714b5..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestMergeScanner {
-  private TajoConf conf;
-  StorageManager sm;
-  private static String TEST_PATH = "target/test-data/TestMergeScanner";
-
-  private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
-      "{\n" +
-      "  \"type\": \"record\",\n" +
-      "  \"namespace\": \"org.apache.tajo\",\n" +
-      "  \"name\": \"testMultipleFiles\",\n" +
-      "  \"fields\": [\n" +
-      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
-      "    { \"name\": \"file\", \"type\": \"string\" },\n" +
-      "    { \"name\": \"name\", \"type\": \"string\" },\n" +
-      "    { \"name\": \"age\", \"type\": \"long\" }\n" +
-      "  ]\n" +
-      "}\n";
-
-  private Path testDir;
-  private StoreType storeType;
-  private FileSystem fs;
-
-  public TestMergeScanner(StoreType storeType) {
-    this.storeType = storeType;
-  }
-
-  @Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][] {
-        {StoreType.CSV},
-        {StoreType.RAW},
-        {StoreType.RCFILE},
-        {StoreType.PARQUET},
-        {StoreType.SEQUENCEFILE},
-        {StoreType.AVRO},
-        // RowFile requires Byte-buffer read support, so we omitted RowFile.
-        //{StoreType.ROWFILE},
-    });
-  }
-
-  @Before
-  public void setup() throws Exception {
-    conf = new TajoConf();
-    conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
-    conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getStorageManager(conf, testDir);
-  }
-
-  @Test
-  public void testMultipleFiles() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("file", Type.TEXT);
-    schema.addColumn("name", Type.TEXT);
-    schema.addColumn("age", Type.INT8);
-
-    KeyValueSet options = new KeyValueSet();
-    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
-    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
-    if (storeType == StoreType.AVRO) {
-      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
-                     TEST_MULTIPLE_FILES_AVRO_SCHEMA);
-    }
-
-    Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table1Path);
-    appender1.enableStats();
-    appender1.init();
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    for(int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createText("hyunsik"));
-      vTuple.put(2, DatumFactory.createText("jihoon"));
-      vTuple.put(3, DatumFactory.createInt8(25l));
-      appender1.addTuple(vTuple);
-    }
-    appender1.close();
-
-    TableStats stat1 = appender1.getStats();
-    if (stat1 != null) {
-      assertEquals(tupleNum, stat1.getNumRows().longValue());
-    }
-
-    Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table2Path);
-    appender2.enableStats();
-    appender2.init();
-
-    for(int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createText("hyunsik"));
-      vTuple.put(2, DatumFactory.createText("jihoon"));
-      vTuple.put(3, DatumFactory.createInt8(25l));
-      appender2.addTuple(vTuple);
-    }
-    appender2.close();
-
-    TableStats stat2 = appender2.getStats();
-    if (stat2 != null) {
-      assertEquals(tupleNum, stat2.getNumRows().longValue());
-    }
-
-
-    FileStatus status1 = fs.getFileStatus(table1Path);
-    FileStatus status2 = fs.getFileStatus(table2Path);
-    FileFragment[] fragment = new FileFragment[2];
-    fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
-    fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
-
-    Schema targetSchema = new Schema();
-    targetSchema.addColumn(schema.getColumn(0));
-    targetSchema.addColumn(schema.getColumn(2));
-
-    Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.<FileFragment>newList(fragment), targetSchema);
-    assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
-
-    scanner.init();
-    int totalCounts = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
-      totalCounts++;
-      if (isProjectableStorage(meta.getStoreType())) {
-        assertNotNull(tuple.get(0));
-        assertNull(tuple.get(1));
-        assertNotNull(tuple.get(2));
-        assertNull(tuple.get(3));
-      }
-    }
-    scanner.close();
-
-    assertEquals(tupleNum * 2, totalCounts);
-	}
-
-  private static boolean isProjectableStorage(StoreType type) {
-    switch (type) {
-      case RCFILE:
-      case PARQUET:
-      case SEQUENCEFILE:
-      case CSV:
-      case AVRO:
-        return true;
-      default:
-        return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
deleted file mode 100644
index 12ea551..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.CharsetUtil;
-import org.apache.tajo.storage.text.FieldSplitProcessor;
-import org.apache.tajo.storage.text.LineSplitProcessor;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static io.netty.util.ReferenceCountUtil.releaseLater;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestSplitProcessor {
-
-  @Test
-  public void testFieldSplitProcessor() throws IOException {
-    String data = "abc||de";
-    final ByteBuf buf = releaseLater(
-        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
-    final int len = buf.readableBytes();
-    FieldSplitProcessor processor = new FieldSplitProcessor('|');
-
-    assertEquals(3, buf.forEachByte(0, len, processor));
-    assertEquals(4, buf.forEachByte(4, len - 4, processor));
-    assertEquals(-1, buf.forEachByte(5, len - 5, processor));
-
-  }
-
-  @Test
-  public void testLineSplitProcessor() throws IOException {
-    String data = "abc\r\n\n";
-    final ByteBuf buf = releaseLater(
-        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
-    final int len = buf.readableBytes();
-    LineSplitProcessor processor = new LineSplitProcessor();
-
-    //find CR
-    assertEquals(3, buf.forEachByte(0, len, processor));
-
-    // find CRLF
-    assertEquals(4, buf.forEachByte(4, len - 4, processor));
-    assertEquals(buf.getByte(4), '\n');
-    // need to skip LF
-    assertTrue(processor.isPrevCharCR());
-
-    // find LF
-    assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero
-  }
-}