You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/12/20 06:36:32 UTC

git commit: TAJO-435: Improve intermediate file. (jinho)

Updated Branches:
  refs/heads/master f58f6ee82 -> f08724f9c


TAJO-435: Improve intermediate file. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/f08724f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/f08724f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/f08724f9

Branch: refs/heads/master
Commit: f08724f9c8bb80d93db0d5517fe6351a9a4d6e63
Parents: f58f6ee
Author: jinossy <ji...@gmail.com>
Authored: Fri Dec 20 14:35:53 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Dec 20 14:35:53 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../engine/planner/PhysicalPlannerImpl.java     |   4 +-
 .../tajo/engine/planner/global/DataChannel.java |  10 +-
 .../engine/planner/global/GlobalPlanner.java    |  14 +-
 .../planner/physical/IndexedStoreExec.java      |   6 +-
 .../planner/physical/PartitionedStoreExec.java  |   4 +-
 .../planner/physical/TestPhysicalPlanner.java   |  14 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |  15 +-
 .../java/org/apache/tajo/storage/RawFile.java   |  76 +-
 .../apache/tajo/storage/index/TestBSTIndex.java | 845 ++++++-------------
 11 files changed, 348 insertions(+), 643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cceb1cb..d7872b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -97,6 +97,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-435: Improve intermediate file. (jinho)
+
     TAJO-424: Make serializer/deserializer configurable in CSVFile. (jinho)
 
     TAJO-419: Add missing visitor methods of AlgebraVisitor and

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index fb1c29b..5420692 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -136,6 +136,7 @@ public class TajoConf extends YarnConfiguration {
     //////////////////////////////////
     PULLSERVER_PORT("tajo.pullserver.port", 0),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
+    SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
 
     //////////////////////////////////
     // Storage Configuration

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 73395a6..5120106 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -90,7 +90,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                            PhysicalExec execPlan) throws IOException {
     DataChannel channel = context.getDataChannel();
     StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
-    storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+    if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
     storeTableNode.setInSchema(plan.getOutSchema());
     storeTableNode.setOutSchema(plan.getOutSchema());
     if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
@@ -773,7 +773,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
     List<FileFragment> fragments =
-        FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
+        FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos);
 
     String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
     Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 0401718..556c7ff 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -36,7 +36,7 @@ public class DataChannel {
 
   private Schema schema;
 
-  private StoreType storeType = StoreType.CSV;
+  private StoreType storeType = StoreType.RAW;
 
   public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
     this.srcId = srcId;
@@ -77,6 +77,10 @@ public class DataChannel {
     if (proto.hasPartitionNum()) {
       this.partitionNum = proto.getPartitionNum();
     }
+
+    if (proto.hasStoreType()) {
+      this.storeType = proto.getStoreType();
+    }
   }
 
   public ExecutionBlockId getSrcId() {
@@ -163,6 +167,10 @@ public class DataChannel {
     if (partitionNum != null) {
       builder.setPartitionNum(partitionNum);
     }
+
+    if(storeType != null){
+      builder.setStoreType(storeType);
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index abf5620..ea3c366 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
@@ -43,12 +44,13 @@ public class GlobalPlanner {
   private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
 
   private TajoConf conf;
-  private AbstractStorageManager sm;
+  private CatalogProtos.StoreType storeType;
 
   public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
       throws IOException {
     this.conf = conf;
-    this.sm = sm;
+    this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
+    Preconditions.checkArgument(storeType != null);
   }
 
   public class GlobalPlanContext {
@@ -76,6 +78,7 @@ public class GlobalPlanner {
     if (childExecBlock.getPlan() != null) {
       ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
       DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
+      dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
       dataChannel.setSchema(lastNode.getOutSchema());
       masterPlan.addConnect(dataChannel);
       masterPlan.setTerminal(terminalBlock);
@@ -99,6 +102,7 @@ public class GlobalPlanner {
     ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
 
     DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+    channel.setStoreType(storeType);
     if (join.getJoinType() != JoinType.CROSS) {
       Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
           leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
@@ -213,6 +217,7 @@ public class GlobalPlanner {
     channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
     channel.setPartitionKey(groupbyNode.getGroupingColumns());
     channel.setSchema(topMostOfFirstPhase.getOutSchema());
+    channel.setStoreType(storeType);
 
     // setup current block with channel
     ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), channel);
@@ -271,6 +276,7 @@ public class GlobalPlanner {
           channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         }
         channel.setSchema(firstPhaseGroupBy.getOutSchema());
+        channel.setStoreType(storeType);
 
         ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
         groupbyNode.setChild(scanNode);
@@ -299,6 +305,7 @@ public class GlobalPlanner {
     DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
     channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
     channel.setSchema(firstSortNode.getOutSchema());
+    channel.setStoreType(storeType);
 
     ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
     currentNode.setChild(secondScan);
@@ -359,6 +366,8 @@ public class GlobalPlanner {
         DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
         newChannel.setPartitionKey(new Column[]{});
         newChannel.setSchema(node.getOutSchema());
+        newChannel.setStoreType(storeType);
+
         ScanNode scanNode = buildInputExecutor(plan, newChannel);
         LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan(), node);
         parentLimit.setChild(scanNode);
@@ -464,6 +473,7 @@ public class GlobalPlanner {
 
       for (ExecutionBlock childBlocks : queryBlockBlocks) {
         DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
+        channel.setStoreType(storeType);
         context.plan.addConnect(channel);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index 596c470..afb4d3c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -23,13 +23,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -67,8 +67,8 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
     this.comp = new TupleComparator(keySchema, sortSpecs);
     Path storeTablePath = new Path(context.getWorkDir(), "output");
     LOG.info("Output data directory: " + storeTablePath);
-    this.meta = CatalogUtil
-        .newTableMeta(CatalogProtos.StoreType.CSV);
+    this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
+        context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
     FileSystem fs = new RawLocalFileSystem();
     fs.mkdirs(storeTablePath);
     this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
index 2e53229..bcea189 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
@@ -66,8 +65,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     Preconditions.checkArgument(plan.hasPartitionKey());
     this.plan = plan;
-    this.meta = CatalogUtil.newTableMeta(StoreType.CSV);
-    
+    this.meta = CatalogUtil.newTableMeta(context.getDataChannel().getStoreType());
     // about the partitions
     this.numPartitions = this.plan.getNumPartitions();
     int i = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b4c66a1..1a46af6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -455,7 +455,7 @@ public class TestPhysicalPlanner {
     ctx.setDataChannel(dataChannel);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
 
     FileSystem fs = sm.getFileSystem();
 
@@ -513,7 +513,7 @@ public class TestPhysicalPlanner {
     ctx.setDataChannel(dataChannel);
     optimizer.optimize(plan);
 
-    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
@@ -597,7 +597,7 @@ public class TestPhysicalPlanner {
     System.out.println(rootNode.toString());
 
     // Set all aggregation functions to the first phase mode
-    GroupbyNode groupbyNode = (GroupbyNode) PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+    GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
     for (Target target : groupbyNode.getTargets()) {
       for (EvalNode eval : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
         if (eval instanceof AggregationFunctionCallEval) {
@@ -795,12 +795,13 @@ public class TestPhysicalPlanner {
         keySchema, comp);
     reader.open();
     Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, new Options());
+    TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new Options());
     SeekableScanner scanner =
-        StorageManagerFactory.getSeekableScanner(conf, meta, employee.getSchema(), outputPath);
+        StorageManagerFactory.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
     scanner.init();
 
     int cnt = 0;
+
     while(scanner.next() != null) {
       cnt++;
     }
@@ -814,7 +815,8 @@ public class TestPhysicalPlanner {
       long offsets = reader.find(keytuple);
       scanner.seek(offsets);
       tuple = scanner.next();
-      assertTrue("[seek check " + (i) + " ]" , ("name_" + i).equals(tuple.get(0).asChars()));
+
+      assertTrue("[seek check " + (i) + " ]", ("name_" + i).equals(tuple.get(0).asChars()));
       assertTrue("[seek check " + (i) + " ]" , i == tuple.get(1).asInt4());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 43ea5f8..2d97e7a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -35,7 +35,10 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.engine.planner.physical.IndexedStoreExec;
+import org.apache.tajo.engine.planner.physical.MemSortExec;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.planner.physical.ProjectionExec;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -130,9 +133,9 @@ public class TestRangeRetrieverHandler {
 
     FileFragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE);
 
-    TaskAttemptContext
-        ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
         new FileFragment[] {frags[0]}, testDir);
+
     Expr expr = analyzer.parse(SORT_QUERY[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -159,7 +162,8 @@ public class TestRangeRetrieverHandler {
         new Path(testDir, "output/index"), keySchema, comp);
     reader.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, employeeMeta, schema,
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new Options());
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema,
         StorageUtil.concatPath(testDir, "output", "output"));
 
     scanner.init();
@@ -270,7 +274,8 @@ public class TestRangeRetrieverHandler {
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(
         new Path(testDir, "output/index"), keySchema, comp);
     reader.open();
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema,
+    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new Options());
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, outputMeta, schema,
         StorageUtil.concatPath(testDir, "output", "output"));
     scanner.init();
     int cnt = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index e3ddd09..7cce7e5 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.storage;
 
-import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.Message;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +34,7 @@ import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.BitArray;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.URI;
@@ -58,6 +58,7 @@ public class RawFile {
     private static final int RECORD_SIZE = 4;
     private boolean eof = false;
     private long fileSize;
+    private FileInputStream fis;
 
     public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
       super(conf, schema, meta, null);
@@ -74,15 +75,15 @@ public class RawFile {
       //Preconditions.checkArgument(FileUtil.isLocalPath(path));
       // TODO - to make it unified one.
       URI uri = path.toUri();
-      RandomAccessFile raf = new RandomAccessFile(new File(uri), "r");
-      channel = raf.getChannel();
+      fis = new FileInputStream(new File(uri));
+      channel = fis.getChannel();
       fileSize = channel.size();
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
       }
 
-      buffer = ByteBuffer.allocateDirect(65535 * 4);
+      buffer = ByteBuffer.allocateDirect(128 * 1024);
 
       columnTypes = new DataType[schema.getColumnNum()];
       for (int i = 0; i < schema.getColumnNum(); i++) {
@@ -103,17 +104,27 @@ public class RawFile {
 
     @Override
     public long getNextOffset() throws IOException {
-      return channel.position();
+      return channel.position() - buffer.remaining();
     }
 
     @Override
     public void seek(long offset) throws IOException {
-      channel.position(offset);
+      long currentPos = channel.position();
+      if(currentPos < offset &&  offset < currentPos + buffer.limit()){
+        buffer.position((int)(offset - currentPos));
+      } else {
+        buffer.clear();
+        channel.position(offset);
+        channel.read(buffer);
+        buffer.flip();
+        eof = false;
+      }
     }
 
     private boolean fillBuffer() throws IOException {
       buffer.compact();
       if (channel.read(buffer) == -1) {
+        eof = true;
         return false;
       } else {
         buffer.flip();
@@ -132,18 +143,15 @@ public class RawFile {
       }
 
       // backup the buffer state
-      int recordOffset = buffer.position();
       int bufferLimit = buffer.limit();
-
       int recordSize = buffer.getInt();
       int nullFlagSize = buffer.getShort();
+
       buffer.limit(buffer.position() + nullFlagSize);
       nullFlags.fromByteBuffer(buffer);
-
       // restore the start of record contents
       buffer.limit(bufferLimit);
-      buffer.position(recordOffset + headerSize);
-
+      //buffer.position(recordOffset + headerSize);
       if (buffer.remaining() < (recordSize - headerSize)) {
         if (!fillBuffer()) {
           return null;
@@ -249,26 +257,6 @@ public class RawFile {
       return tuple;
     }
 
-    /**
-     * It reads a variable byte array whose length is represented as a variable unsigned integer.
-     *
-     * @return A byte array read
-     */
-    private byte [] getColumnBytes() throws IOException {
-      byte [] lenBytesLen = new byte[4];
-      buffer.mark();
-      buffer.get(lenBytesLen);
-      CodedInputStream ins = CodedInputStream.newInstance(lenBytesLen);
-      int bytesLen = ins.readUInt32(); // get a variable unsigned integer length to be read
-      int read = ins.getTotalBytesRead();
-      buffer.reset();
-      buffer.position(buffer.position() + read);
-
-      byte [] rawBytes = new byte[bytesLen];
-      buffer.get(rawBytes);
-      return rawBytes;
-    }
-
     @Override
     public void reset() throws IOException {
       // clear the buffer
@@ -284,6 +272,7 @@ public class RawFile {
     public void close() throws IOException {
       buffer.clear();
       channel.close();
+      fis.close();
     }
 
     @Override
@@ -311,6 +300,7 @@ public class RawFile {
     private BitArray nullFlags;
     private int headerSize = 0;
     private static final int RECORD_SIZE = 4;
+    private long pos;
 
     private TableStatistics stats;
 
@@ -324,13 +314,14 @@ public class RawFile {
       File file = new File(path.toUri());
       randomAccessFile = new RandomAccessFile(file, "rw");
       channel = randomAccessFile.getChannel();
+      pos = 0;
 
       columnTypes = new DataType[schema.getColumnNum()];
       for (int i = 0; i < schema.getColumnNum(); i++) {
         columnTypes[i] = schema.getColumn(i).getDataType();
       }
 
-      buffer = ByteBuffer.allocateDirect(65535);
+      buffer = ByteBuffer.allocateDirect(64 * 1024);
 
       // comput the number of bytes, representing the null flags
 
@@ -346,7 +337,7 @@ public class RawFile {
 
     @Override
     public long getOffset() throws IOException {
-      return channel.position();
+      return pos;
     }
 
     private void flushBuffer() throws IOException {
@@ -386,8 +377,7 @@ public class RawFile {
 
       // skip the row header
       int recordOffset = buffer.position();
-      buffer.position(buffer.position() + headerSize);
-
+      buffer.position(recordOffset + headerSize);
       // reset the null flags
       nullFlags.clear();
       for (int i = 0; i < schema.getColumnNum(); i++) {
@@ -496,13 +486,15 @@ public class RawFile {
       }
 
       // write a record header
-      int pos = buffer.position();
+      int bufferPos = buffer.position();
       buffer.position(recordOffset);
-      buffer.putInt(pos - recordOffset);
+      buffer.putInt(bufferPos - recordOffset);
       byte [] flags = nullFlags.toArray();
       buffer.putShort((short) flags.length);
       buffer.put(flags);
-      buffer.position(pos);
+
+      pos += bufferPos - recordOffset;
+      buffer.position(bufferPos);
 
       if (enabledStats) {
         stats.incrementRow();
@@ -512,12 +504,18 @@ public class RawFile {
     @Override
     public void flush() throws IOException {
       flushBuffer();
-      channel.force(true);
     }
 
     @Override
     public void close() throws IOException {
       flush();
+      if (enabledStats) {
+        stats.setNumBytes(getOffset());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+      }
+      channel.close();
       randomAccessFile.close();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f08724f9/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index bd152f3..34c362c 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -34,12 +34,19 @@ import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class TestBSTIndex {
   private TajoConf conf;
   private Schema schema;
@@ -50,8 +57,10 @@ public class TestBSTIndex {
   private static final String TEST_PATH = "target/test-data/TestIndex";
   private Path testDir;
   private FileSystem fs;
-  
-  public TestBSTIndex() {
+  private StoreType storeType;
+
+  public TestBSTIndex(StoreType type) {
+    this.storeType = type;
     conf = new TajoConf();
     conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH);
     schema = new Schema();
@@ -62,37 +71,45 @@ public class TestBSTIndex {
     schema.addColumn(new Column("string", Type.TEXT));
   }
 
-   
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {StoreType.CSV},
+        {StoreType.RAW}
+    });
+  }
+
   @Before
   public void setUp() throws Exception {
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
   }
-  
+
   @Test
-  public void testFindValueInCSV() throws IOException {
-    meta = CatalogUtil.newTableMeta(StoreType.CSV);
-    
-    Path tablePath = new Path(testDir, "FindValueInCSV.csv");
-    Appender appender  = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+  public void testFindValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindValue_" + storeType);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
-        tuple = new VTuple(5);
-        tuple.put(0, DatumFactory.createInt4(i));
-        tuple.put(1, DatumFactory.createInt8(i));
-        tuple.put(2, DatumFactory.createFloat8(i));
-        tuple.put(3, DatumFactory.createFloat4(i));
-        tuple.put(4, DatumFactory.createText("field_" + i));
-        appender.addTuple(tuple);
-      }
+    for (int i = 0; i < TUPLE_NUM; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
     appender.close();
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
     FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
+
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
 
@@ -101,11 +118,11 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
+
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindValueInCSV.idx"),
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX,
-        keySchema, comp);    
+        keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
@@ -119,52 +136,54 @@ public class TestBSTIndex {
       offset = scanner.getNextOffset();
       tuple = scanner.next();
       if (tuple == null) break;
-      
+
       keyTuple.put(0, tuple.get(1));
       keyTuple.put(1, tuple.get(2));
       creater.write(keyTuple, offset);
     }
-    
+
     creater.flush();
     creater.close();
     scanner.close();
-    
+
     tuple = new VTuple(keySchema.getColumnNum());
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp);
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
     reader.open();
     scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
-    for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
       long offsets = reader.find(tuple);
       scanner.seek(offsets);
       tuple = scanner.next();
-      assertTrue("seek check [" + (i) + " ," +(tuple.get(1).asInt8())+ "]" , (i) == (tuple.get(1).asInt8()));
-      assertTrue("seek check [" + (i) + " ,"  +(tuple.get(2).asFloat8())+"]" , (i) == (tuple.get(2).asFloat8()));
-      
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+
       offsets = reader.next();
       if (offsets == -1) {
         continue;
       }
       scanner.seek(offsets);
       tuple = scanner.next();
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(0).asInt4()));
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(1).asInt8()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
     }
+    reader.close();
+    scanner.close();
   }
 
   @Test
   public void testBuildIndexWithAppender() throws IOException {
-    meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    meta = CatalogUtil.newTableMeta(storeType);
 
-    Path tablePath = new Path(testDir, "BuildIndexWithAppender.csv");
-    FileAppender appender  = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
+    Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
+    FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
         tablePath);
     appender.init();
 
-    SortSpec [] sortKeys = new SortSpec[2];
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
 
@@ -175,14 +194,14 @@ public class TestBSTIndex {
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "BuildIndexWithAppender.idx"),
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
     Tuple tuple;
     long offset;
-    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+    for (int i = 0; i < TUPLE_NUM; i++) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
@@ -206,20 +225,20 @@ public class TestBSTIndex {
     FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
 
     tuple = new VTuple(keySchema.getColumnNum());
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "BuildIndexWithAppender.idx"),
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
     SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
-    for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
       long offsets = reader.find(tuple);
       scanner.seek(offsets);
       tuple = scanner.next();
       assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8()));
-      assertTrue("[seek check " + (i) + " ]" , (i) == (tuple.get(2).asFloat8()));
+      assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8()));
 
       offsets = reader.next();
       if (offsets == -1) {
@@ -227,34 +246,36 @@ public class TestBSTIndex {
       }
       scanner.seek(offsets);
       tuple = scanner.next();
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(0).asInt4()));
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (tuple.get(1).asInt8()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
     }
+    reader.close();
+    scanner.close();
   }
-  
+
   @Test
-  public void testFindOmittedValueInCSV() throws IOException {
-    meta = CatalogUtil.newTableMeta(StoreType.CSV);
-    
-    Path tablePath = StorageUtil.concatPath(testDir, "FindOmittedValueInCSV.csv");
+  public void testFindOmittedValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i += 2 ) {
-        tuple = new VTuple(5);
-        tuple.put(0, DatumFactory.createInt4(i));
-        tuple.put(1, DatumFactory.createInt8(i));
-        tuple.put(2, DatumFactory.createFloat8(i));
-        tuple.put(3, DatumFactory.createFloat4(i));
-        tuple.put(4, DatumFactory.createText("field_" + i));
-        appender.addTuple(tuple);
-      }
+    for (int i = 0; i < TUPLE_NUM; i += 2) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
     appender.close();
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen());
-    
-    SortSpec [] sortKeys = new SortSpec[2];
+
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
 
@@ -263,14 +284,14 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
+
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindOmittedValueInCSV.idx"),
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -280,35 +301,37 @@ public class TestBSTIndex {
       offset = scanner.getNextOffset();
       tuple = scanner.next();
       if (tuple == null) break;
-      
+
       keyTuple.put(0, tuple.get(1));
       keyTuple.put(1, tuple.get(2));
       creater.write(keyTuple, offset);
     }
-    
+
     creater.flush();
     creater.close();
     scanner.close();
 
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindOmittedValueInCSV.idx"), keySchema, comp);
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
+        keySchema, comp);
     reader.open();
-    for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
+    for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
       keyTuple.put(0, DatumFactory.createInt8(i));
       keyTuple.put(1, DatumFactory.createFloat8(i));
       long offsets = reader.find(keyTuple);
       assertEquals(-1, offsets);
     }
+    reader.close();
   }
-  
+
   @Test
-  public void testFindNextKeyValueInCSV() throws IOException {
-    meta = CatalogUtil.newTableMeta(StoreType.CSV);
+  public void testFindNextKeyValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
 
-    Path tablePath = new Path(testDir, "FindNextKeyValueInCSV.csv");
+    Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+    for (int i = 0; i < TUPLE_NUM; i++) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
@@ -322,8 +345,8 @@ public class TestBSTIndex {
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
     FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
+
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
 
@@ -332,14 +355,14 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
+
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -349,24 +372,24 @@ public class TestBSTIndex {
       offset = scanner.getNextOffset();
       tuple = scanner.next();
       if (tuple == null) break;
-      
+
       keyTuple.put(0, tuple.get(0));
       keyTuple.put(1, tuple.get(1));
       creater.write(keyTuple, offset);
     }
-    
+
     creater.flush();
     creater.close();
     scanner.close();
-    
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"),
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple result;
-    for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
       keyTuple = new VTuple(2);
       keyTuple.put(0, DatumFactory.createInt4(i));
       keyTuple.put(1, DatumFactory.createInt8(i));
@@ -375,28 +398,30 @@ public class TestBSTIndex {
       result = scanner.next();
       assertTrue("[seek check " + (i + 1) + " ]",
           (i + 1) == (result.get(0).asInt4()));
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8()));
-      
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
+
       offsets = reader.next();
       if (offsets == -1) {
         continue;
       }
       scanner.seek(offsets);
       result = scanner.next();
-      assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8()));
-      assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8()));
+      assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8()));
+      assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8()));
     }
+    reader.close();
+    scanner.close();
   }
-  
+
   @Test
-  public void testFindNextKeyOmittedValueInCSV() throws IOException {
-    meta = CatalogUtil.newTableMeta(StoreType.CSV);
+  public void testFindNextKeyOmittedValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
 
-    Path tablePath = new Path(testDir, "FindNextKeyOmittedValueInCSV.csv");
+    Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i+=2) {
+    for (int i = 0; i < TUPLE_NUM; i += 2) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
@@ -410,8 +435,8 @@ public class TestBSTIndex {
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
     FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
+
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
 
@@ -420,167 +445,16 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
-        "FindNextKeyOmittedValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
-    creater.setLoadNum(LOAD_NUM);
-    creater.open();
-
-    SeekableScanner scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
-    scanner.init();
-
-    Tuple keyTuple;
-    long offset;
-    while (true) {
-      keyTuple = new VTuple(2);
-      offset = scanner.getNextOffset();
-      tuple = scanner.next();
-      if (tuple == null) break;
-      
-      keyTuple.put(0, tuple.get(0));
-      keyTuple.put(1, tuple.get(1));
-      creater.write(keyTuple, offset);
-    }
-    
-    creater.flush();
-    creater.close();
-    scanner.close();
-    
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInCSV.idx"),
-        keySchema, comp);
-    reader.open();
-    scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
-    scanner.init();
-
-    Tuple result;
-    for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
-      keyTuple = new VTuple(2);
-      keyTuple.put(0, DatumFactory.createInt4(i));
-      keyTuple.put(1, DatumFactory.createInt8(i));
-      long offsets = reader.find(keyTuple, true);
-      scanner.seek(offsets);
-      result = scanner.next();
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4()));
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8()));
-    }
-  }
-
-  /*
-  @Test
-  public void testFindValueInRaw() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
-    
-    sm.initTableBase(meta, "table1");
-    Appender appender  = sm.getAppender(meta, "table1", "table1.csv");
-    Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
-        tuple = new VTuple(5);
-        tuple.put(0, DatumFactory.createInt4(i));
-        tuple.put(1, DatumFactory.createInt8(i));
-        tuple.put(2, DatumFactory.createFloat8(i));
-        tuple.put(3, DatumFactory.createFloat4(i));
-        tuple.put(4, DatumFactory.createText("field_"+i));
-        appender.addTuple(tuple);
-      }
-    appender.close();
-    
-    appender.close();
 
-    FileStatus status = sm.listTableFiles("table1")[0];
-    long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
-    sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), false, false);
-    sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
-
-    Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("long", DataType.LONG));
-    keySchema.addColumn(new Column("double", DataType.DOUBLE));
-
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindValueInRawBSTIndex.idx"),
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    Tuple keyTuple;
-    long offset;
-    while (true) {
-      keyTuple = new VTuple(2);
-      offset = scanner.getNextOffset();
-      tuple = scanner.next();
-      if (tuple == null) break;
-      
-      keyTuple.put(0, tuple.get(1));
-      keyTuple.put(1, tuple.get(2));
-      creater.write(keyTuple, offset);
-    }
-    
-    creater.flush();
-    creater.close();
-    scanner.close();
-    
-    tuple = new VTuple(keySchema.getColumnNum());
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInRawBSTIndex.idx"), keySchema, comp);
-    reader.open();
-    scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
-      tuple.put(0, DatumFactory.createInt8(i));
-      tuple.put(1, DatumFactory.createFloat8(i));
-      long offsets = reader.find(tuple, false);
-      scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("[seek check " + (i) + " ]" , (i) == (tuple.get(1).asLong()));
-      assertTrue("[seek check " + (i) + " ]" , (i) == (tuple.get(2).asDouble()));
-    }
-  }
-  
-  @Test
-  public void testFindOmittedValueInRaw() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
-    
-    sm.initTableBase(meta, "table1");
-    Appender appender  = sm.getAppender(meta, "table1", "table1.csv");
-    Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i += 2 ) {
-        tuple = new VTuple(5);
-        tuple.put(0, DatumFactory.createInt4(i));
-        tuple.put(1, DatumFactory.createInt8(i));
-        tuple.put(2, DatumFactory.createFloat8(i));
-        tuple.put(3, DatumFactory.createFloat4(i));
-        tuple.put(4, DatumFactory.createText("field_"+i));
-        appender.addTuple(tuple);
-      }
-    appender.close();
-    
-    appender.close();
-
-    FileStatus status = sm.listTableFiles("table1")[0];
-    long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
-    sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), false, false);
-    sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
-
-    Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("long", DataType.LONG));
-    keySchema.addColumn(new Column("double", DataType.DOUBLE));
-
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindOmittedValueInRaw.idx"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
-    creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
 
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -588,305 +462,78 @@ public class TestBSTIndex {
       offset = scanner.getNextOffset();
       tuple = scanner.next();
       if (tuple == null) break;
-      
-      keyTuple.put(0, tuple.get(1));
-      keyTuple.put(1, tuple.get(2));
-      creater.write(keyTuple, offset);
-    }
-    
-    creater.flush();
-    creater.close();
-    scanner.close();
-    
-    tuple = new VTuple(keySchema.getColumnNum());
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindOmittedValueInRaw.idx"),
-        keySchema, comp);
-    reader.open();
-    for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
-      tuple.put(0, DatumFactory.createInt8(i));
-      tuple.put(1, DatumFactory.createFloat8(i));
-      long offsets = reader.find(tuple, false);
-      assertEquals(-1, offsets);
-    }
-  }
-  
-  @Test
-  public void testFindNextKeyValueInRaw() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
 
-    sm.initTableBase(meta, "table1");
-    Appender appender = sm.getAppender(meta, "table1", "table1.csv");
-    Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
-      tuple = new VTuple(5);
-      tuple.put(0, DatumFactory.createInt4(i));
-      tuple.put(1, DatumFactory.createInt8(i));
-      tuple.put(2, DatumFactory.createFloat8(i));
-      tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
-      appender.addTuple(tuple);
-    }
-    appender.close();
-
-    FileStatus status = sm.listTableFiles("table1")[0];
-    long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
-    sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
-    sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
-
-    Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("int", DataType.INT));
-    keySchema.addColumn(new Column("long", DataType.LONG));
-
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindOmittedValueInRaw.idx"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
-    creater.setLoadNum(LOAD_NUM);
-    creater.open();
-
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    Tuple keyTuple;
-    long offset;
-    while (true) {
-      keyTuple = new VTuple(2);
-      offset = scanner.getNextOffset();
-      tuple = scanner.next();
-      if (tuple == null) break;
-      
       keyTuple.put(0, tuple.get(0));
       keyTuple.put(1, tuple.get(1));
       creater.write(keyTuple, offset);
     }
-    
-    creater.flush();
-    creater.close();
-    scanner.close();
-    
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindOmittedValueInRaw.idx"), keySchema, comp);
-    reader.open();
-    scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    Tuple result;
-    for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
-      keyTuple = new VTuple(2);
-      keyTuple.put(0, DatumFactory.createInt4(i));
-      keyTuple.put(1, DatumFactory.createInt8(i));
-      long offsets = reader.find(keyTuple, true);
-      scanner.seek(offsets);
-      result = scanner.next();
-      assertTrue("[seek check " + (i + 1) + " ]",
-          (i + 1) == (result.get(0).asInt()));
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asLong()));
-      
-      offsets = reader.next();
-      if (offsets == -1) {
-        continue;
-      }
-      scanner.seek(offsets);
-      result = scanner.next();
-      assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asLong()));
-      assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asDouble()));
-    }
-  }
-  
-  @Test
-  public void testFindNextKeyOmittedValueInRaw() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
 
-    sm.initTableBase(meta, "table1");
-    Appender appender = sm.getAppender(meta, "table1", "table1.csv");
-    Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i+=2) {
-      tuple = new VTuple(5);
-      tuple.put(0, DatumFactory.createInt4(i));
-      tuple.put(1, DatumFactory.createInt8(i));
-      tuple.put(2, DatumFactory.createFloat8(i));
-      tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
-      appender.addTuple(tuple);
-    }
-    appender.close();
-
-    FileStatus status = sm.listTableFiles("table1")[0];
-    long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
-    sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
-    sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
-
-    Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("int", DataType.INT));
-    keySchema.addColumn(new Column("long", DataType.LONG));
-
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
-    creater.setLoadNum(LOAD_NUM);
-    creater.open();
-
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    Tuple keyTuple;
-    long offset;
-    while (true) {
-      keyTuple = new VTuple(2);
-      offset = scanner.getNextOffset();
-      tuple = scanner.next();
-      if (tuple == null) break;
-      
-      keyTuple.put(0, tuple.get(0));
-      keyTuple.put(1, tuple.get(1));
-      creater.write(keyTuple, offset);
-    }
-    
     creater.flush();
     creater.close();
     scanner.close();
-    
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     Tuple result;
-    for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
+    for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
       keyTuple = new VTuple(2);
       keyTuple.put(0, DatumFactory.createInt4(i));
       keyTuple.put(1, DatumFactory.createInt8(i));
       long offsets = reader.find(keyTuple, true);
       scanner.seek(offsets);
       result = scanner.next();
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt()));
-      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asLong()));      
-    }
-  }
-  
-  @Test
-  public void testNextInRaw() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
-
-    sm.initTableBase(meta, "table1");
-    Appender appender = sm.getAppender(meta, "table1", "table1.csv");
-    Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
-      tuple = new VTuple(5);
-      tuple.put(0, DatumFactory.createInt4(i));
-      tuple.put(1, DatumFactory.createInt8(i));
-      tuple.put(2, DatumFactory.createFloat8(i));
-      tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
-      appender.addTuple(tuple);
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
     }
-    appender.close();
-
-    FileStatus status = sm.listTableFiles("table1")[0];
-    long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
-    
-    SortSpec [] sortKeys = new SortSpec[2];
-    sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
-    sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
-
-    Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("int", DataType.INT));
-    keySchema.addColumn(new Column("long", DataType.LONG));
-
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-    
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
-    creater.setLoadNum(LOAD_NUM);
-    creater.open();
-
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    Tuple keyTuple;
-    long offset;
-    while (true) {
-      keyTuple = new VTuple(2);
-      offset = scanner.getNextOffset();
-      tuple = scanner.next();
-      if (tuple == null) break;
-      
-      keyTuple.put(0, tuple.get(0));
-      keyTuple.put(1, tuple.get(1));
-      creater.write(keyTuple, offset);
-    }
-    
-    creater.flush();
-    creater.close();
     scanner.close();
-    
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
-        keySchema, comp);
-    reader.open();
-    scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    Tuple result;
-
-    keyTuple = new VTuple(2);
-    keyTuple.put(0, DatumFactory.createInt4(0));
-    keyTuple.put(1, DatumFactory.createInt8(0));
-    long offsets = reader.find(keyTuple);
-    scanner.seek(offsets);
-    result = scanner.next();
-    assertTrue("[seek check " + 0 + " ]" , (0) == (result.get(0).asInt()));
-    assertTrue("[seek check " + 0 + " ]" , (0) == (result.get(1).asLong()));
-      
-    for (int i = 1; i < TUPLE_NUM; i++) {
-      offsets = reader.next();
-      
-      scanner.seek(offsets);
-      result = scanner.next();
-      assertEquals(i, result.get(0).asInt());
-      assertEquals(i, result.get(1).asLong());
-    }
   }
 
   @Test
   public void testFindMinValue() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.CSV);
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
 
-    sm.initTableBase(meta, "table1");
-    Appender appender  = sm.getAppender(meta, "table1", "table1.csv");
     Tuple tuple;
-    for(int i = 5 ; i < TUPLE_NUM + 5; i ++ ) {
+    for (int i = 5; i < TUPLE_NUM + 5; i++) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
       tuple.put(2, DatumFactory.createFloat8(i));
       tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
       appender.addTuple(tuple);
     }
     appender.close();
 
-    appender.close();
-
-    FileStatus status = sm.listTableFiles("table1")[0];
+    FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
 
-    SortSpec [] sortKeys = new SortSpec[2];
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), true, false);
 
     Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("long", DataType.LONG));
-    keySchema.addColumn(new Column("double", DataType.DOUBLE));
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
-
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "Test.idx"), BSTIndex.TWO_LEVEL_INDEX,
-        keySchema, comp);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -905,62 +552,71 @@ public class TestBSTIndex {
     scanner.close();
 
     tuple = new VTuple(keySchema.getColumnNum());
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "Test.idx"), keySchema, comp);
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+        keySchema, comp);
     reader.open();
-    scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     tuple.put(0, DatumFactory.createInt8(0));
     tuple.put(1, DatumFactory.createFloat8(0));
 
     offset = reader.find(tuple);
     assertEquals(-1, offset);
 
-    offset = reader.find(tuple , true);
+    offset = reader.find(tuple, true);
     assertTrue(offset >= 0);
     scanner.seek(offset);
     tuple = scanner.next();
-    assertEquals(5, tuple.get(1).asInt());
-    assertEquals(5l, tuple.get(2).asLong());
+    assertEquals(5, tuple.get(1).asInt4());
+    assertEquals(5l, tuple.get(2).asInt8());
+    reader.close();
+    scanner.close();
   }
 
   @Test
   public void testMinMax() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
+    meta = CatalogUtil.newTableMeta(storeType);
 
-    sm.initTableBase(meta, "table1");
-    Appender appender = sm.getAppender(meta, "table1", "table1.csv");
+    Path tablePath = new Path(testDir, "testMinMax_" + storeType);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
     Tuple tuple;
-    for(int i = 5 ; i < TUPLE_NUM; i ++ ) {
+    for (int i = 5; i < TUPLE_NUM; i += 2) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
       tuple.put(2, DatumFactory.createFloat8(i));
       tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
       appender.addTuple(tuple);
     }
     appender.close();
 
-    FileStatus status = sm.listTableFiles("table1")[0];
+    FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
 
-    SortSpec [] sortKeys = new SortSpec[2];
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
 
     Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("int", DataType.INT));
-    keySchema.addColumn(new Column("long", DataType.LONG));
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -978,17 +634,18 @@ public class TestBSTIndex {
     creater.close();
     scanner.close();
 
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInRaw.idx"),
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
 
     Tuple min = reader.getFirstKey();
-    assertEquals(5, min.get(0).asInt());
-    assertEquals(5l, min.get(0).asLong());
+    assertEquals(5, min.get(0).asInt4());
+    assertEquals(5l, min.get(0).asInt8());
 
     Tuple max = reader.getLastKey();
-    assertEquals(TUPLE_NUM - 1, max.get(0).asInt());
-    assertEquals(TUPLE_NUM - 1, max.get(0).asLong());
+    assertEquals(TUPLE_NUM - 1, max.get(0).asInt4());
+    assertEquals(TUPLE_NUM - 1, max.get(0).asInt8());
+    reader.close();
   }
 
   private class ConcurrentAccessor implements Runnable {
@@ -1024,43 +681,47 @@ public class TestBSTIndex {
 
   @Test
   public void testConcurrentAccess() throws IOException, InterruptedException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.RAW);
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
 
-    sm.initTableBase(meta, "table1");
-    Appender appender = sm.getAppender(meta, "table1", "table1.csv");
     Tuple tuple;
-    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+    for (int i = 0; i < TUPLE_NUM; i++) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
       tuple.put(2, DatumFactory.createFloat8(i));
       tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
       appender.addTuple(tuple);
     }
     appender.close();
 
-    FileStatus status = sm.listTableFiles("table1")[0];
+    FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
 
-    SortSpec [] sortKeys = new SortSpec[2];
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), true, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), true, false);
 
     Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("int", DataType.INT));
-    keySchema.addColumn(new Column("long", DataType.LONG));
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "ConcurrentAccess.idx"),
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -1078,12 +739,12 @@ public class TestBSTIndex {
     creater.close();
     scanner.close();
 
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "ConcurrentAccess.idx"),
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
 
-    Thread [] threads = new Thread[5];
-    ConcurrentAccessor [] accs = new ConcurrentAccessor[5];
+    Thread[] threads = new Thread[5];
+    ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
     for (int i = 0; i < threads.length; i++) {
       accs[i] = new ConcurrentAccessor(reader);
       threads[i] = new Thread(accs[i]);
@@ -1094,49 +755,54 @@ public class TestBSTIndex {
       threads[i].join();
       assertFalse(accs[i].isFailed());
     }
+    reader.close();
   }
 
+
   @Test
-  public void testFindValueInCSVDescOrder() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.CSV);
+  public void testFindValueDescOrder() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
 
-    sm.initTableBase(meta, "table1");
-    Appender appender  = sm.getAppender(meta, "table1", "table1.csv");
     Tuple tuple;
-    for(int i = (TUPLE_NUM - 1); i >= 0; i -- ) {
+    for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
       tuple.put(2, DatumFactory.createFloat8(i));
       tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
       appender.addTuple(tuple);
     }
     appender.close();
 
-    appender.close();
-
-    FileStatus status = sm.listTableFiles("table1")[0];
+    FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
 
-    SortSpec [] sortKeys = new SortSpec[2];
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("long"), false, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("double"), false, false);
 
     Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("long", DataType.LONG));
-    keySchema.addColumn(new Column("double", DataType.DOUBLE));
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
 
+
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX,
-        keySchema, comp);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -1155,17 +821,21 @@ public class TestBSTIndex {
     scanner.close();
 
     tuple = new VTuple(keySchema.getColumnNum());
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp);
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+        keySchema, comp);
     reader.open();
-    scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
-    for(int i = (TUPLE_NUM - 1) ; i > 0  ; i --) {
+    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    for (int i = (TUPLE_NUM - 1); i > 0; i--) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
       long offsets = reader.find(tuple);
       scanner.seek(offsets);
       tuple = scanner.next();
-      assertTrue("seek check [" + (i) + " ," +(tuple.get(1).asLong())+ "]" , (i) == (tuple.get(1).asLong()));
-      assertTrue("seek check [" + (i) + " ,"  +(tuple.get(2).asDouble())+"]" , (i) == (tuple.get(2).asDouble()));
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
 
       offsets = reader.next();
       if (offsets == -1) {
@@ -1173,50 +843,56 @@ public class TestBSTIndex {
       }
       scanner.seek(offsets);
       tuple = scanner.next();
-      assertTrue("[seek check " + (i - 1) + " ]" , (i - 1) == (tuple.get(0).asInt()));
-      assertTrue("[seek check " + (i - 1) + " ]" , (i - 1) == (tuple.get(1).asLong()));
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4()));
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8()));
     }
+    reader.close();
+    scanner.close();
   }
 
   @Test
-  public void testFindNextKeyValueInCSVDescOrder() throws IOException {
-    meta = TCatUtil.newTableMeta(schema, StoreType.CSV);
+  public void testFindNextKeyValueDescOrder() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.init();
 
-    sm.initTableBase(meta, "table1");
-    Appender appender = sm.getAppender(meta, "table1", "table1.csv");
     Tuple tuple;
-    for(int i = (TUPLE_NUM - 1); i >= 0; i --) {
+    for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
       tuple = new VTuple(5);
       tuple.put(0, DatumFactory.createInt4(i));
       tuple.put(1, DatumFactory.createInt8(i));
       tuple.put(2, DatumFactory.createFloat8(i));
       tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_"+i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
       appender.addTuple(tuple);
     }
     appender.close();
 
-    FileStatus status = sm.listTableFiles("table1")[0];
+    FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    FileFragment tablet = new FileFragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
 
-    SortSpec [] sortKeys = new SortSpec[2];
+    SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumnByFQN("int"), false, false);
     sortKeys[1] = new SortSpec(schema.getColumnByFQN("long"), false, false);
 
     Schema keySchema = new Schema();
-    keySchema.addColumn(new Column("int", DataType.INT));
-    keySchema.addColumn(new Column("long", DataType.LONG));
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
 
     TupleComparator comp = new TupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
-    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
+        "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -1234,15 +910,19 @@ public class TestBSTIndex {
     creater.close();
     scanner.close();
 
-    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"),
+        keySchema, comp);
     reader.open();
 
     assertEquals(keySchema, reader.getKeySchema());
     assertEquals(comp, reader.getComparator());
 
-    scanner  = (SeekableScanner)(sm.getFileScanner(meta, new FileFragment[]{tablet}));
+    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
     Tuple result;
-    for(int i = (TUPLE_NUM - 1) ; i > 0 ; i --) {
+    for (int i = (TUPLE_NUM - 1); i > 0; i--) {
       keyTuple = new VTuple(2);
       keyTuple.put(0, DatumFactory.createInt4(i));
       keyTuple.put(1, DatumFactory.createInt8(i));
@@ -1250,8 +930,8 @@ public class TestBSTIndex {
       scanner.seek(offsets);
       result = scanner.next();
       assertTrue("[seek check " + (i - 1) + " ]",
-          (i - 1) == (result.get(0).asInt()));
-      assertTrue("[seek check " + (i - 1) + " ]" , (i - 1) == (result.get(1).asLong()));
+          (i - 1) == (result.get(0).asInt4()));
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8()));
 
       offsets = reader.next();
       if (offsets == -1) {
@@ -1259,9 +939,10 @@ public class TestBSTIndex {
       }
       scanner.seek(offsets);
       result = scanner.next();
-      assertTrue("[seek check " + (i - 2) + " ]" , (i - 2) == (result.get(0).asLong()));
-      assertTrue("[seek check " + (i - 2) + " ]" , (i - 2) == (result.get(1).asDouble()));
+      assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8()));
+      assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8()));
     }
+    reader.close();
+    scanner.close();
   }
-  */
 }