You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:39 UTC

[02/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
new file mode 100644
index 0000000..12b984e
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.*;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ScheduledInputStream extends InputStream implements Seekable, Closeable, DataInput {
+  private static final Log LOG = LogFactory.getLog(ScheduledInputStream.class);
+
+	private FSDataInputStream originStream;
+
+  private int currentScanIndex;
+
+  private Queue<ScanData> dataQueue = new LinkedList<ScanData>();
+
+  private ScanData currentScanData;
+
+  private AtomicBoolean closed = new AtomicBoolean(false);
+
+  private boolean eof = false;
+
+  private long pos;
+
+  private AtomicInteger avaliableSize = new AtomicInteger(0);
+
+  private long fileLen;
+
+  private long startOffset;
+
+  private long length;
+
+  private long endOffset;
+
+  private boolean endOfStream = false;
+
+  private Path file;
+
+  private byte readLongBuffer[] = new byte[8];
+
+  private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
+
+  private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
+
+	public ScheduledInputStream(Path file, FSDataInputStream originStream,
+                              long startOffset, long length, long fileLen) throws IOException {
+		this.originStream = originStream;
+		this.startOffset = startOffset;
+		this.length = length;
+		this.endOffset = startOffset + length;
+		this.fileLen = fileLen;
+    this.file = file;
+		this.pos = this.originStream.getPos();
+
+    LOG.info("Open:" + toString());
+	}
+
+	public int getAvaliableSize() {
+		return avaliableSize.get();
+	}
+
+  public String toString() {
+    return file.getName() + ":" + startOffset + ":" + length;
+  }
+	public boolean readNext(int length) throws IOException {
+		return readNext(length, false);
+	}
+	
+	public boolean readNext(int length, boolean ignoreEOS) throws IOException {
+    synchronized(dataQueue) {
+      if(closed.get() || (!ignoreEOS && endOfStream)) {
+        return false;
+      }
+      int bufLength = ignoreEOS ? length : (int)Math.min(length,  endOffset - originStream.getPos());
+      bufLength = (int)Math.min(bufLength, fileLen - originStream.getPos());
+      if(bufLength == 0) {
+        return false;
+      }
+			byte[] buf = new byte[bufLength];
+
+      try {
+        originStream.readFully(buf);
+      } catch (EOFException e) {
+        LOG.error(e.getMessage(), e);
+        throw e;
+      } catch (Exception e) {
+        throw new IOException(e.getMessage(), e);
+      }
+
+      if(originStream.getPos() == fileLen) {
+        LOG.info("EOF:" + toString());
+        eof = true;
+      }
+      if(!ignoreEOS && originStream.getPos() >= endOffset) {
+        LOG.info("EndOfStream:" + toString());
+        endOfStream = true;
+      }
+
+      if(currentScanData == null) {
+        currentScanData = new ScanData(buf, bufLength);
+        currentScanIndex = 0;
+      } else {
+        dataQueue.offer(new ScanData(buf, bufLength));
+      }
+
+      avaliableSize.addAndGet(bufLength);
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Add DataQueue: queue=" + dataQueue.size() +
+          ", avaliable Size=" + avaliableSize.get() + ", pos=" + getPos() +
+          ", originPos=" + originStream.getPos() + ",endOfStream=" + endOfStream +
+          ", bufLength=" + bufLength + ",ignoreEOS=" + ignoreEOS);
+      }
+
+      totalReadBytesFromDisk.addAndGet(bufLength);
+      dataQueue.notifyAll();
+    }
+    return !eof;
+	}
+	
+	static class ScanData {
+		byte[] data;
+		int length;
+		public ScanData(byte[] buf, int length) {
+			this.data = buf;
+			this.length = length;
+		}
+		
+		@Override
+		public String toString() {
+			return "length=" + length;
+		}
+	}
+
+	@Override
+	public void seek(long pos) throws IOException {
+		synchronized(dataQueue) {
+			dataQueue.clear();
+			currentScanData = null;
+			currentScanIndex = 0;
+			avaliableSize.set(0);
+      originStream.seek(pos);
+      this.pos = pos;
+    }
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return this.pos;
+	}
+
+	public long getOriginStreamPos() {
+		try {
+			return this.originStream.getPos();
+		} catch (IOException e) {
+			e.printStackTrace();
+			return 0;
+		}
+	}
+	
+	@Override
+	public boolean seekToNewSource(long targetPos) throws IOException {
+		synchronized(dataQueue) {
+			dataQueue.clear();
+			currentScanData = null;
+			currentScanIndex = 0;
+			avaliableSize.set(0);
+      boolean result = originStream.seekToNewSource(targetPos);
+
+      this.pos = originStream.getPos();
+      return result;
+		}
+	}
+
+	@Override
+	public int read() throws IOException {
+		if(noMoreData()) {
+			return -1;
+		}
+		if(currentScanData == null || currentScanIndex >= currentScanData.length) {
+			synchronized(dataQueue) {
+				if(dataQueue.isEmpty()) {
+					if(endOfStream) {
+						readNext(64 * 1024, true);
+					} else {
+						try {
+							dataQueue.wait();
+							if(eof && dataQueue.isEmpty() && currentScanIndex > 0) {
+								//no more data
+								return -1;
+							}
+						} catch (InterruptedException e) {
+						}
+					}
+				}
+				if(!dataQueue.isEmpty() && currentScanIndex > 0) {
+					currentScanData = dataQueue.poll();
+					currentScanIndex = 0;
+				}
+			}
+		} 
+		
+		this.pos++;
+		avaliableSize.decrementAndGet();
+    totalReadBytesForFetch.incrementAndGet();
+
+		return currentScanData.data[currentScanIndex++] & 0xff;
+	}
+	
+	private boolean noMoreData() {
+		return closed.get();
+	}
+	
+	public int read(byte b[], int off, int len) throws IOException {
+		if(noMoreData()) {
+			return -1;
+		}
+		if (b == null) {
+		    throw new NullPointerException();
+		} else if (off < 0 || len < 0 || len > b.length - off) {
+		    throw new IndexOutOfBoundsException();
+		} else if (len == 0) {
+		    return 0;
+		}
+		if(currentScanData == null) {
+			synchronized(dataQueue) {
+				if(dataQueue.isEmpty()) {
+					if(endOfStream) {
+						readNext(64 * 1024, true);
+					} else {
+						try {
+							dataQueue.wait();
+							if(noMoreData()) {
+								return -1;
+							}
+						} catch (InterruptedException e) {
+						}
+					}
+				}
+				if(!dataQueue.isEmpty() && currentScanIndex > 0) {
+					currentScanData = dataQueue.poll();
+					currentScanIndex = 0;
+				}
+			}
+		} 
+		
+		int numRemainBytes = currentScanData.length - currentScanIndex;
+		if(numRemainBytes > len) {
+			System.arraycopy(currentScanData.data, currentScanIndex, b, off, len);
+			currentScanIndex += len;
+			avaliableSize.addAndGet(0 - len);
+			pos += len;
+
+      totalReadBytesForFetch.addAndGet(len);
+			return len;
+		} else {
+			int offset = off;
+			int length = 0;
+			int numCopyBytes = numRemainBytes;
+			while(true) {
+				synchronized(dataQueue) {
+					if(numCopyBytes == 0 && eof && dataQueue.isEmpty()) {
+						return -1;
+					}
+				}
+				System.arraycopy(currentScanData.data, currentScanIndex, b, offset, numCopyBytes);
+				currentScanIndex += numCopyBytes;
+				offset += numCopyBytes;
+				length += numCopyBytes;
+				if(length >= len) {
+					break;
+				}
+				synchronized(dataQueue) {
+					if(dataQueue.isEmpty()) {
+						if(eof) {
+							break;
+						}
+						if(endOfStream) {
+							readNext(64 * 1024, true);
+						} else {
+							try {
+								dataQueue.wait();
+							} catch (InterruptedException e) {
+							}
+						}
+					}
+					if(eof && dataQueue.isEmpty()) {
+						break;
+					}
+					if(!dataQueue.isEmpty() && currentScanIndex > 0) {
+						currentScanData = dataQueue.poll();
+						currentScanIndex = 0;
+					}
+					if(currentScanData == null) {
+						break;
+					}
+				}
+        if(currentScanData.length >= (len - length)) {
+          numCopyBytes = (len - length);
+        } else {
+          numCopyBytes = currentScanData.length;
+        }
+			}  //end of while
+			this.pos += length;
+			avaliableSize.addAndGet(0 - length);
+
+      totalReadBytesForFetch.addAndGet(length);
+			return length;
+		}
+	}
+
+  public long getTotalReadBytesForFetch() {
+    return totalReadBytesForFetch.get();
+  }
+
+  public long getTotalReadBytesFromDisk() {
+    return totalReadBytesFromDisk.get();
+  }
+
+	@Override
+	public void close() throws IOException {
+    LOG.info("Close:" + toString());
+		synchronized(dataQueue) {
+			if(closed.get()) {
+				return;
+			}
+			closed.set(true);
+			originStream.close();
+			dataQueue.clear();
+			currentScanIndex = 0;
+			super.close();
+		}
+	}
+
+	@Override
+	public void readFully(byte[] b) throws IOException {
+		readFully(b, 0, b.length);
+	}
+
+	@Override
+	public void readFully(byte[] b, int off, int len) throws IOException {
+		if (len < 0) {
+		    throw new IndexOutOfBoundsException();
+		}
+		int n = 0;
+		while (n < len) {
+		    int count = read(b, off + n, len - n);
+		    if (count < 0) {
+		    	throw new EOFException();
+		    }
+		    n += count;
+		}
+	}
+
+	@Override
+	public int skipBytes(int bytes) throws IOException {
+		int skipTotal = 0;
+		int currentPos = 0;
+
+		while ((skipTotal<bytes) && ((currentPos = (int)skip(bytes-skipTotal)) > 0)) {
+      skipTotal += currentPos;
+		}
+
+		return skipTotal;
+	}
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		int val = read();
+		if (val < 0) {
+		    throw new EOFException();
+    }
+		return (val != 0);
+	}
+
+	@Override
+	public byte readByte() throws IOException {
+		int val = read();
+		if (val < 0) {
+		    throw new EOFException();
+    }
+		return (byte)(val);
+	}
+
+	@Override
+	public int readUnsignedByte() throws IOException {
+		int val = read();
+		if (val < 0) {
+		    throw new EOFException();
+    }
+		return val;
+	}
+
+	@Override
+	public short readShort() throws IOException {
+    int val1 = read();
+    int val2 = read();
+    if ((val1 | val2) < 0) {
+        throw new EOFException();
+    }
+    return (short)((val1 << 8) + (val2 << 0));
+	}
+
+	@Override
+	public int readUnsignedShort() throws IOException {
+    int val1 = read();
+    int val2 = read();
+    if ((val1 | val2) < 0) {
+        throw new EOFException();
+    }
+    return (val1 << 8) + (val2 << 0);
+	}
+
+	@Override
+	public char readChar() throws IOException {
+    int val1 = read();
+    int val2 = read();
+    if ((val1 | val2) < 0) {
+        throw new EOFException();
+    }
+    return (char)((val1 << 8) + (val2 << 0));
+	}
+
+	@Override
+	public int readInt() throws IOException {
+    int val1 = read();
+    int val2 = read();
+    int val3 = read();
+    int val4 = read();
+    if ((val1 | val2 | val3 | val4) < 0) {
+        throw new EOFException();
+    }
+    return ((val1 << 24) + (val2 << 16) + (val3 << 8) + (val4 << 0));
+	}
+
+	@Override
+	public long readLong() throws IOException {
+    readFully(readLongBuffer, 0, 8);
+    return  (((long) readLongBuffer[0] << 56) +
+            ((long)(readLongBuffer[1] & 255) << 48) +
+		        ((long)(readLongBuffer[2] & 255) << 40) +
+            ((long)(readLongBuffer[3] & 255) << 32) +
+            ((long)(readLongBuffer[4] & 255) << 24) +
+            ((readLongBuffer[5] & 255) << 16) +
+            ((readLongBuffer[6] & 255) <<  8) +
+            ((readLongBuffer[7] & 255) <<  0));
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return Float.intBitsToFloat(readInt());
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return Double.longBitsToDouble(readLong());
+	}
+
+	@Override
+	public String readLine() throws IOException {
+		throw new IOException("Unsupported operation: readLine");
+	}
+
+	@Override
+	public String readUTF() throws IOException {
+		throw new IOException("Unsupported operation: readUTF");
+	}
+
+	public boolean isEOF() {
+		return eof;
+	}
+
+	public boolean isEndOfStream() {
+		return endOfStream;
+	}
+
+  public void reset() {
+    synchronized(dataQueue) {
+      endOfStream = false;
+      eof = false;
+      closed.set(false);
+      dataQueue.clear();
+      currentScanIndex = 0;
+      currentScanData = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
new file mode 100644
index 0000000..cffff00
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public final class StorageManagerV2 extends AbstractStorageManager {
+  private final Log LOG = LogFactory.getLog(StorageManagerV2.class);
+
+	private Queue<FileScannerV2> scanQueue = new LinkedList<FileScannerV2>();
+	
+	private Object scanQueueLock = new Object();
+	
+	private Object scanDataLock = new Object();
+	
+	private ScanScheduler scanScheduler;
+	
+	private StorgaeManagerContext context;
+
+  public StorageManagerV2(TajoConf conf) throws IOException {
+    super(conf);
+		context = new StorgaeManagerContext();
+		scanScheduler = new ScanScheduler(context);
+		scanScheduler.start();
+    LOG.info("StorageManager v2 started...");
+	}
+
+  @Override
+  public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException {
+    Class<? extends Scanner> scannerClass;
+
+    String handlerName = storeType.name().toLowerCase();
+    String handlerNameKey = handlerName + "_v2";
+
+    scannerClass = SCANNER_HANDLER_CACHE.get(handlerNameKey);
+    if (scannerClass == null) {
+      scannerClass = conf.getClass(String.format("tajo.storage.scanner-handler.v2.%s.class",
+          storeType.name().toLowerCase()), null, Scanner.class);
+      SCANNER_HANDLER_CACHE.put(handlerNameKey, scannerClass);
+    }
+
+    return scannerClass;
+  }
+
+  @Override
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    Scanner scanner;
+
+    Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
+    if (scannerClass == null) {
+      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+    }
+
+    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    if (scanner.isProjectable()) {
+      scanner.setTarget(target.toArray());
+    }
+
+    if(scanner instanceof FileScannerV2) {
+      ((FileScannerV2)scanner).setStorageManagerContext(context);
+    }
+    return scanner;
+  }
+
+	public void requestFileScan(FileScannerV2 fileScanner) {
+		synchronized(scanQueueLock) {
+			scanQueue.offer(fileScanner);
+			
+			scanQueueLock.notifyAll();
+		}
+	}
+
+	public StorgaeManagerContext getContext() {
+		return context;
+	}
+
+  public class StorgaeManagerContext {
+		public Object getScanQueueLock() {
+			return scanQueueLock;
+		}
+
+		public Object getScanDataLock() {
+			return scanDataLock;
+		}
+		
+		public Queue<FileScannerV2> getScanQueue() {
+			return scanQueue;
+		}
+
+		public int getMaxReadBytesPerScheduleSlot() {
+			return conf.getIntVar(TajoConf.ConfVars.STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT);
+		}
+
+    public void requestFileScan(FileScannerV2 fileScanner) {
+      StorageManagerV2.this.requestFileScan(fileScanner);
+    }
+
+    public TajoConf getConf() {
+      return conf;
+    }
+
+    public void incrementReadBytes(int diskId, long[] readBytes) {
+      scanScheduler.incrementReadBytes(diskId, readBytes);
+    }
+  }
+
+	public void stop() {
+		if(scanScheduler != null) {
+			scanScheduler.stopScheduler();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
new file mode 100644
index 0000000..bcb0cbe
--- /dev/null
+++ b/tajo-storage/src/main/proto/IndexProtos.proto
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.index";
+option java_outer_classname = "IndexProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message TupleComparatorProto {
+  repeated TupleComparatorSpecProto compSpecs = 1;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
new file mode 100644
index 0000000..5bf4453
--- /dev/null
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+  <property>
+    <name>tajo.storage.manager.v2</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.manager.maxReadBytes</name>
+    <value>8388608</value>
+    <description></description>
+  </property>
+
+  <property>
+    <name>tajo.storage.manager.concurrency.perDisk</name>
+    <value>1</value>
+    <description></description>
+  </property>
+
+  <!--- Registered Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler</name>
+    <value>csv,raw,rcfile,row,trevni</value>
+  </property>
+
+  <!--- Fragment Class Configurations -->
+  <property>
+    <name>tajo.storage.fragment.csv.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.raw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.rcfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.row.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.trevni.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+
+  <!--- Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.v2.csv.class</name>
+    <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.v2.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
+    <value>org.apache.tajo.storage.v2.RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.v2.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+  </property>
+
+  <!--- Appender Handler -->
+  <property>
+    <name>tajo.storage.appender-handler</name>
+    <value>csv,raw,rcfile,row,trevni</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniAppender</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
new file mode 100644
index 0000000..bec1556
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestCompressionStorages {
+  private TajoConf conf;
+  private static String TEST_PATH = "target/test-data/TestCompressionStorages";
+
+  private StoreType storeType;
+  private Path testDir;
+  private FileSystem fs;
+
+  public TestCompressionStorages(StoreType type) throws IOException {
+    this.storeType = type;
+    conf = new TajoConf();
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {StoreType.CSV},
+        {StoreType.RCFILE}
+    });
+  }
+
+  @Test
+  public void testDeflateCodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, DeflateCodec.class);
+  }
+
+  @Test
+  public void testGzipCodecCompressionData() throws IOException {
+    if (storeType == StoreType.RCFILE) {
+      if( ZlibFactory.isNativeZlibLoaded(conf)) {
+        storageCompressionTest(storeType, GzipCodec.class);
+      }
+    } else {
+      storageCompressionTest(storeType, GzipCodec.class);
+    }
+  }
+
+  @Test
+  public void testSnappyCodecCompressionData() throws IOException {
+    if (SnappyCodec.isNativeCodeLoaded()) {
+      storageCompressionTest(storeType, SnappyCodec.class);
+    }
+  }
+
+  @Test
+  public void testBzip2CodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, BZip2Codec.class);
+  }
+
+  @Test
+  public void testLz4CodecCompressionData() throws IOException {
+    if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
+    storageCompressionTest(storeType, Lz4Codec.class);
+  }
+
+  @Test
+  public void testSplitCompressionData() throws IOException {
+    if(StoreType.CSV != storeType) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+
+    TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
+
+    Path tablePath = new Path(testDir, "SplitCompression");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+
+    String extention = "";
+    if (appender instanceof CSVFile.CSVAppender) {
+      extention = ((CSVFile.CSVAppender) appender).getExtension();
+    }
+
+    int tupleNum = 100000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(2);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    TableStats stat = appender.getStats();
+    assertEquals(tupleNum, stat.getNumRows().longValue());
+    tablePath = tablePath.suffix(extention);
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    long randomNum = (long) (Math.random() * fileLen) + 1;
+
+    FileFragment[] tablets = new FileFragment[2];
+    tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
+    tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum));
+
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+    assertTrue(scanner.isSplittable());
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+    scanner.close();
+
+    scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+    assertTrue(scanner.isSplittable());
+    scanner.init();
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+
+    scanner.close();
+    assertEquals(tupleNum, tupleCnt);
+  }
+
+  private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.FLOAT4);
+    schema.addColumn("name", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    meta.putOption("compression.codec", codec.getCanonicalName());
+    meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
+
+    String fileName = "Compression_" + codec.getSimpleName();
+    Path tablePath = new Path(testDir, fileName);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.enableStats();
+
+    appender.init();
+
+    String extension = "";
+    if (appender instanceof CSVFile.CSVAppender) {
+      extension = ((CSVFile.CSVAppender) appender).getExtension();
+    }
+
+    int tupleNum = 100000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(3);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createFloat4((float) i));
+      vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    TableStats stat = appender.getStats();
+    assertEquals(tupleNum, stat.getNumRows().longValue());
+    tablePath = tablePath.suffix(extension);
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment[] tablets = new FileFragment[1];
+    tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
+
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+
+    if (StoreType.CSV == storeType) {
+      if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
+        assertTrue(scanner.isSplittable());
+      } else {
+        assertFalse(scanner.isSplittable());
+      }
+    }
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+    scanner.close();
+    assertEquals(tupleNum, tupleCnt);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
new file mode 100644
index 0000000..387fed5
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFrameTuple {
+  private Tuple tuple1;
+  private Tuple tuple2;
+
+  @Before
+  public void setUp() throws Exception {
+    tuple1 = new VTuple(11);
+    tuple1.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar('9'),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1")
+    });
+    
+    tuple2 = new VTuple(11);
+    tuple2.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar('9'),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1")
+    });
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testFrameTuple() {
+    Tuple frame = new FrameTuple(tuple1, tuple2);
+    assertEquals(22, frame.size());
+    for (int i = 0; i < 22; i++) {
+      assertTrue(frame.contains(i));
+    }
+    
+    assertEquals(DatumFactory.createInt8(23l), frame.get(5));
+    assertEquals(DatumFactory.createInt8(23l), frame.get(16));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
new file mode 100644
index 0000000..e1430e1
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestLazyTuple {
+
+  Schema schema;
+  byte[][] textRow;
+  byte[] nullbytes;
+  SerializerDeserializer serde;
+
+  @Before
+  public void setUp() {
+    nullbytes = "\\N".getBytes();
+
+    schema = new Schema();
+    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+    schema.addColumn("col2", TajoDataTypes.Type.BIT);
+    schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
+    schema.addColumn("col4", TajoDataTypes.Type.INT2);
+    schema.addColumn("col5", TajoDataTypes.Type.INT4);
+    schema.addColumn("col6", TajoDataTypes.Type.INT8);
+    schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
+    schema.addColumn("col9", TajoDataTypes.Type.TEXT);
+    schema.addColumn("col10", TajoDataTypes.Type.BLOB);
+    schema.addColumn("col11", TajoDataTypes.Type.INET4);
+    schema.addColumn("col12", TajoDataTypes.Type.INT4);
+    schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(DatumFactory.createBool(true)).append('|');
+    sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
+    sb.append(DatumFactory.createChar("str")).append('|');
+    sb.append(DatumFactory.createInt2((short) 17)).append('|');
+    sb.append(DatumFactory.createInt4(59)).append('|');
+    sb.append(DatumFactory.createInt8(23l)).append('|');
+    sb.append(DatumFactory.createFloat4(77.9f)).append('|');
+    sb.append(DatumFactory.createFloat8(271.9f)).append('|');
+    sb.append(DatumFactory.createText("str2")).append('|');
+    sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
+    sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
+    sb.append(new String(nullbytes)).append('|');
+    sb.append(NullDatum.get());
+    textRow = Bytes.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+    serde = new TextSerializerDeserializer();
+  }
+
+  @Test
+  public void testGetDatum() {
+
+    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
+    assertEquals(DatumFactory.createBool(true), t1.get(0));
+    assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
+    assertEquals(DatumFactory.createChar("str"), t1.get(2));
+    assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
+    assertEquals(DatumFactory.createInt4(59), t1.get(4));
+    assertEquals(DatumFactory.createInt8(23l), t1.get(5));
+    assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
+    assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
+    assertEquals(DatumFactory.createText("str2"), t1.get(8));
+    assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
+    assertEquals(NullDatum.get(), t1.get(11));
+    assertEquals(NullDatum.get(), t1.get(12));
+  }
+
+  @Test
+  public void testContain() {
+    int colNum = schema.getColumnNum();
+
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(3, DatumFactory.createInt4(1));
+    t1.put(7, DatumFactory.createInt4(1));
+
+    assertTrue(t1.contains(0));
+    assertFalse(t1.contains(1));
+    assertFalse(t1.contains(2));
+    assertTrue(t1.contains(3));
+    assertFalse(t1.contains(4));
+    assertFalse(t1.contains(5));
+    assertFalse(t1.contains(6));
+    assertTrue(t1.contains(7));
+    assertFalse(t1.contains(8));
+    assertFalse(t1.contains(9));
+    assertFalse(t1.contains(10));
+    assertFalse(t1.contains(11));
+    assertFalse(t1.contains(12));
+  }
+
+  @Test
+  public void testPut() {
+    int colNum = schema.getColumnNum();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    t1.put(0, DatumFactory.createText("str"));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(11, DatumFactory.createFloat4(0.76f));
+
+    assertTrue(t1.contains(0));
+    assertTrue(t1.contains(1));
+
+    assertEquals(t1.getString(0).toString(), "str");
+    assertEquals(t1.getInt(1).asInt4(), 2);
+    assertTrue(t1.getFloat(11).asFloat4() == 0.76f);
+  }
+
+  @Test
+  public void testEquals() {
+    int colNum = schema.getColumnNum();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+
+    assertEquals(t1, t2);
+
+    Tuple t3 = new VTuple(colNum);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(3, DatumFactory.createInt4(2));
+    assertEquals(t1, t3);
+    assertEquals(t2, t3);
+
+    LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
+    assertNotSame(t1, t4);
+  }
+
+  @Test
+  public void testHashCode() {
+    int colNum = schema.getColumnNum();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createText("str"));
+
+    assertEquals(t1.hashCode(), t2.hashCode());
+
+    Tuple t3 = new VTuple(colNum);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(3, DatumFactory.createInt4(2));
+    t3.put(4, DatumFactory.createText("str"));
+    assertEquals(t1.hashCode(), t3.hashCode());
+    assertEquals(t2.hashCode(), t3.hashCode());
+
+    Tuple t4 = new VTuple(5);
+    t4.put(0, DatumFactory.createInt4(1));
+    t4.put(1, DatumFactory.createInt4(2));
+    t4.put(4, DatumFactory.createInt4(2));
+
+    assertNotSame(t1.hashCode(), t4.hashCode());
+  }
+
+  @Test
+  public void testPutTuple() {
+    int colNum = schema.getColumnNum();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(2, DatumFactory.createInt4(3));
+
+
+    Schema schema2 = new Schema();
+    schema2.addColumn("col1", TajoDataTypes.Type.INT8);
+    schema2.addColumn("col2", TajoDataTypes.Type.INT8);
+
+    LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.getColumnNum()][], -1);
+    t2.put(0, DatumFactory.createInt4(4));
+    t2.put(1, DatumFactory.createInt4(5));
+
+    t1.put(3, t2);
+
+    for (int i = 0; i < 5; i++) {
+      assertEquals(i + 1, t1.get(i).asInt4());
+    }
+  }
+
+  @Test
+  public void testInvalidNumber() {
+    byte[][] bytes = Bytes.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
+    Schema schema = new Schema();
+    schema.addColumn("col1", TajoDataTypes.Type.INT2);
+    schema.addColumn("col2", TajoDataTypes.Type.INT4);
+    schema.addColumn("col3", TajoDataTypes.Type.INT8);
+    schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
+
+    LazyTuple tuple = new LazyTuple(schema, bytes, 0);
+    assertEquals(bytes.length, tuple.size());
+
+    for (int i = 0; i < tuple.size(); i++){
+      assertEquals(NullDatum.get(), tuple.get(i));
+    }
+  }
+
+  @Test
+  public void testClone() throws CloneNotSupportedException {
+    int colNum = schema.getColumnNum();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    LazyTuple t2 = (LazyTuple) t1.clone();
+    assertNotSame(t1, t2);
+    assertEquals(t1, t2);
+
+    assertSame(t1.get(4), t2.get(4));
+
+    t1.clear();
+    assertFalse(t1.equals(t2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
new file mode 100644
index 0000000..f2a66d9
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestMergeScanner {
+  private TajoConf conf;
+  AbstractStorageManager sm;
+  private static String TEST_PATH = "target/test-data/TestMergeScanner";
+  private Path testDir;
+  private StoreType storeType;
+  private FileSystem fs;
+
+  public TestMergeScanner(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        {StoreType.CSV},
+        {StoreType.RAW},
+        {StoreType.RCFILE},
+        {StoreType.TREVNI},
+        // RowFile requires Byte-buffer read support, so we omitted RowFile.
+        //{StoreType.ROWFILE},
+
+    });
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new TajoConf();
+    conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
+    conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "trevni");
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+  }
+  
+  @Test
+  public void testMultipleFiles() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("file", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    schema.addColumn("age", Type.INT8);
+    
+    Options options = new Options();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+    Path table1Path = new Path(testDir, storeType + "_1.data");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table1Path);
+    appender1.enableStats();
+    appender1.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for(int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createText("hyunsik"));
+      vTuple.put(2, DatumFactory.createText("jihoon"));
+      vTuple.put(3, DatumFactory.createInt8(25l));
+      appender1.addTuple(vTuple);
+    }
+    appender1.close();
+    
+    TableStats stat1 = appender1.getStats();
+    if (stat1 != null) {
+      assertEquals(tupleNum, stat1.getNumRows().longValue());
+    }
+
+    Path table2Path = new Path(testDir, storeType + "_2.data");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table2Path);
+    appender2.enableStats();
+    appender2.init();
+
+    for(int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createText("hyunsik"));
+      vTuple.put(2, DatumFactory.createText("jihoon"));
+      vTuple.put(3, DatumFactory.createInt8(25l));
+      appender2.addTuple(vTuple);
+    }
+    appender2.close();
+
+    TableStats stat2 = appender2.getStats();
+    if (stat2 != null) {
+      assertEquals(tupleNum, stat2.getNumRows().longValue());
+    }
+
+
+    FileStatus status1 = fs.getFileStatus(table1Path);
+    FileStatus status2 = fs.getFileStatus(table2Path);
+    FileFragment[] fragment = new FileFragment[2];
+    fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
+    fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
+
+    Schema targetSchema = new Schema();
+    targetSchema.addColumn(schema.getColumn(0));
+    targetSchema.addColumn(schema.getColumn(2));
+
+    Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.<FileFragment>newList(fragment), targetSchema);
+    assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
+
+    scanner.init();
+    int totalCounts = 0;
+    Tuple tuple;
+    while ((tuple=scanner.next()) != null) {
+      totalCounts++;
+      if (isProjectableStorage(meta.getStoreType())) {
+        assertNotNull(tuple.get(0));
+        assertNull(tuple.get(1));
+        assertNotNull(tuple.get(2));
+        assertNull(tuple.get(3));
+      }
+    }
+    scanner.close();
+    
+    assertEquals(tupleNum * 2, totalCounts);
+	}
+
+  private static boolean isProjectableStorage(StoreType type) {
+    switch (type) {
+      case RCFILE:
+      case TREVNI:
+      case CSV:
+        return true;
+      default:
+        return false;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
new file mode 100644
index 0000000..083670a
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStorageManager {
+	private TajoConf conf;
+	private static String TEST_PATH = "target/test-data/TestStorageManager";
+	AbstractStorageManager sm = null;
+  private Path testDir;
+  private FileSystem fs;
+	@Before
+	public void setUp() throws Exception {
+		conf = new TajoConf();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+  @Test
+	public final void testGetScannerAndAppender() throws IOException {
+		Schema schema = new Schema();
+		schema.addColumn("id", Type.INT4);
+		schema.addColumn("age",Type.INT4);
+		schema.addColumn("name",Type.TEXT);
+
+		TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+		
+		Tuple[] tuples = new Tuple[4];
+		for(int i=0; i < tuples.length; i++) {
+		  tuples[i] = new VTuple(3);
+		  tuples[i].put(new Datum[] {
+          DatumFactory.createInt4(i),
+		      DatumFactory.createInt4(i + 32),
+		      DatumFactory.createText("name" + i)});
+		}
+
+    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
+    fs.mkdirs(path.getParent());
+		Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, path);
+    appender.init();
+		for(Tuple t : tuples) {
+		  appender.addTuple(t);
+		}
+		appender.close();
+
+		Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(meta, schema, path);
+    scanner.init();
+		int i=0;
+		while(scanner.next() != null) {
+			i++;
+		}
+		assertEquals(4,i);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
new file mode 100644
index 0000000..16b370c
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestStorages {
+	private TajoConf conf;
+	private static String TEST_PATH = "target/test-data/TestStorages";
+
+  private StoreType storeType;
+  private boolean splitable;
+  private boolean statsable;
+  private Path testDir;
+  private FileSystem fs;
+
+  public TestStorages(StoreType type, boolean splitable, boolean statsable) throws IOException {
+    this.storeType = type;
+    this.splitable = splitable;
+    this.statsable = statsable;
+
+    conf = new TajoConf();
+
+    if (storeType == StoreType.RCFILE) {
+      conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+    }
+
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        {StoreType.CSV, true, true},
+        {StoreType.RAW, false, false},
+        {StoreType.RCFILE, true, true},
+        {StoreType.TREVNI, false, true},
+    });
+  }
+		
+	@Test
+  public void testSplitable() throws IOException {
+    if (splitable) {
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT8);
+
+      TableMeta meta = CatalogUtil.newTableMeta(storeType);
+      Path tablePath = new Path(testDir, "Splitable.data");
+      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+      appender.enableStats();
+      appender.init();
+      int tupleNum = 10000;
+      VTuple vTuple;
+
+      for(int i = 0; i < tupleNum; i++) {
+        vTuple = new VTuple(2);
+        vTuple.put(0, DatumFactory.createInt4(i + 1));
+        vTuple.put(1, DatumFactory.createInt8(25l));
+        appender.addTuple(vTuple);
+      }
+      appender.close();
+      TableStats stat = appender.getStats();
+      assertEquals(tupleNum, stat.getNumRows().longValue());
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      long fileLen = status.getLen();
+      long randomNum = (long) (Math.random() * fileLen) + 1;
+
+      FileFragment[] tablets = new FileFragment[2];
+      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      int tupleCnt = 0;
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      assertEquals(tupleNum, tupleCnt);
+    }
+	}
+
+  @Test
+  public void testProjection() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("score", Type.FLOAT4);
+
+    TableMeta meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testProjection.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for(int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(3);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(i + 2));
+      vTuple.put(2, DatumFactory.createFloat4(i + 3));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
+
+    Schema target = new Schema();
+    target.addColumn("age", Type.INT8);
+    target.addColumn("score", Type.FLOAT4);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment, target);
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI || storeType == StoreType.CSV) {
+        assertTrue(tuple.get(0) == null);
+      }
+      assertEquals(DatumFactory.createInt8(tupleCnt + 2), tuple.getLong(1));
+      assertEquals(DatumFactory.createFloat4(tupleCnt + 3), tuple.getFloat(2));
+      tupleCnt++;
+    }
+    scanner.close();
+
+    assertEquals(tupleNum, tupleCnt);
+  }
+
+  @Test
+  public void testVariousTypes() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    Options options = new Options();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("hyunsik"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+  }
+
+  @Test
+  public void testRCFileTextSerializeDeserialize() throws IOException {
+    if(storeType != StoreType.RCFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    Options options = new Options();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(RCFile.SERDE, TextSerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+  }
+
+  @Test
+  public void testRCFileBinarySerializeDeserialize() throws IOException {
+    if(storeType != StoreType.RCFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    Options options = new Options();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(RCFile.SERDE, BinarySerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
new file mode 100644
index 0000000..7092953
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTupleComparator {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testCompare() {
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT4);
+    schema.addColumn("col2", Type.INT4);
+    schema.addColumn("col3", Type.INT4);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.TEXT);
+    
+    Tuple tuple1 = new VTuple(5);
+    Tuple tuple2 = new VTuple(5);
+
+    tuple1.put(
+        new Datum[] {
+        DatumFactory.createInt4(9),
+        DatumFactory.createInt4(3),
+        DatumFactory.createInt4(33),
+        DatumFactory.createInt4(4),
+        DatumFactory.createText("abc")});
+    tuple2.put(
+        new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(25),
+        DatumFactory.createInt4(109),
+        DatumFactory.createInt4(4),
+        DatumFactory.createText("abd")});
+
+    SortSpec sortKey1 = new SortSpec(schema.getColumnByFQN("col4"), true, false);
+    SortSpec sortKey2 = new SortSpec(schema.getColumnByFQN("col5"), true, false);
+
+    TupleComparator tc = new TupleComparator(schema,
+        new SortSpec[] {sortKey1, sortKey2});
+    assertEquals(-1, tc.compare(tuple1, tuple2));
+    assertEquals(1, tc.compare(tuple2, tuple1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
new file mode 100644
index 0000000..05f47a5
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.datum.DatumFactory;
+
+import static org.junit.Assert.*;
+
+public class TestVTuple {
+
+	/**
+	 * @throws java.lang.Exception
+	 */
+	@Before
+	public void setUp() throws Exception {
+		
+	}
+	
+	@Test
+	public void testContain() {
+		VTuple t1 = new VTuple(260);
+		t1.put(0, DatumFactory.createInt4(1));
+		t1.put(1, DatumFactory.createInt4(1));
+		t1.put(27, DatumFactory.createInt4(1));
+		t1.put(96, DatumFactory.createInt4(1));
+		t1.put(257, DatumFactory.createInt4(1));
+		
+		assertTrue(t1.contains(0));
+		assertTrue(t1.contains(1));
+		assertFalse(t1.contains(2));
+		assertFalse(t1.contains(3));
+		assertFalse(t1.contains(4));
+		assertTrue(t1.contains(27));
+		assertFalse(t1.contains(28));
+		assertFalse(t1.contains(95));
+		assertTrue(t1.contains(96));
+		assertFalse(t1.contains(97));
+		assertTrue(t1.contains(257));
+	}
+	
+	@Test
+	public void testPut() {
+		VTuple t1 = new VTuple(260);
+		t1.put(0, DatumFactory.createText("str"));
+		t1.put(1, DatumFactory.createInt4(2));
+		t1.put(257, DatumFactory.createFloat4(0.76f));
+		
+		assertTrue(t1.contains(0));
+		assertTrue(t1.contains(1));
+		
+		assertEquals(t1.getString(0).toString(),"str");
+		assertEquals(t1.getInt(1).asInt4(),2);
+		assertTrue(t1.getFloat(257).asFloat4() == 0.76f);
+	}
+
+  @Test
+	public void testEquals() {
+	  Tuple t1 = new VTuple(5);
+	  Tuple t2 = new VTuple(5);
+	  
+	  t1.put(0, DatumFactory.createInt4(1));
+	  t1.put(1, DatumFactory.createInt4(2));
+	  t1.put(3, DatumFactory.createInt4(2));
+	  
+	  t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    
+    assertEquals(t1,t2);
+    
+    Tuple t3 = new VTuple(5);
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createInt4(2));
+    
+    assertNotSame(t1,t3);
+	}
+	
+	@Test
+	public void testHashCode() {
+	  Tuple t1 = new VTuple(5);
+    Tuple t2 = new VTuple(5);
+    
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("hyunsik"));
+    
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createText("hyunsik"));
+    
+    assertEquals(t1.hashCode(),t2.hashCode());
+    
+    Tuple t3 = new VTuple(5);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(4, DatumFactory.createInt4(2));
+    
+    assertNotSame(t1.hashCode(),t3.hashCode());
+	}
+
+  @Test
+  public void testPutTuple() {
+    Tuple t1 = new VTuple(5);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(2, DatumFactory.createInt4(3));
+
+    Tuple t2 = new VTuple(2);
+    t2.put(0, DatumFactory.createInt4(4));
+    t2.put(1, DatumFactory.createInt4(5));
+
+    t1.put(3, t2);
+
+    for (int i = 0; i < 5; i++) {
+      assertEquals(i+1, t1.get(i).asInt4());
+    }
+  }
+
+  @Test
+  public void testClone() throws CloneNotSupportedException {
+    Tuple t1 = new VTuple(5);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    VTuple t2 = (VTuple) t1.clone();
+    assertNotSame(t1, t2);
+    assertEquals(t1, t2);
+
+    assertSame(t1.get(4), t2.get(4));
+
+    t1.clear();
+    assertFalse(t1.equals(t2));
+  }
+}