You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:22 UTC

[08/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
new file mode 100644
index 0000000..383740d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -0,0 +1,947 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestBSTIndex {
+  private TajoConf conf;
+  private Schema schema;
+  private TableMeta meta;
+
+  private static final int TUPLE_NUM = 10000;
+  private static final int LOAD_NUM = 100;
+  private static final String TEST_PATH = "target/test-data/TestIndex";
+  private Path testDir;
+  private FileSystem fs;
+  private StoreType storeType;
+
+  public TestBSTIndex(StoreType type) {
+    this.storeType = type;
+    conf = new TajoConf();
+    conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH);
+    schema = new Schema();
+    schema.addColumn(new Column("int", Type.INT4));
+    schema.addColumn(new Column("long", Type.INT8));
+    schema.addColumn(new Column("double", Type.FLOAT8));
+    schema.addColumn(new Column("float", Type.FLOAT4));
+    schema.addColumn(new Column("string", Type.TEXT));
+  }
+
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {StoreType.CSV},
+        {StoreType.RAW}
+    });
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Test
+  public void testFindValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindValue_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+    for (int i = 0; i < TUPLE_NUM; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX,
+        keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(1));
+      keyTuple.put(1, tuple.get(2));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    tuple = new VTuple(keySchema.size());
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
+    reader.open();
+    scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
+      tuple.put(0, DatumFactory.createInt8(i));
+      tuple.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(tuple);
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
+    }
+    reader.close();
+    scanner.close();
+  }
+
+  @Test
+  public void testBuildIndexWithAppender() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
+    FileAppender appender = (FileAppender) ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(meta, schema, tablePath);
+    appender.init();
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    Tuple tuple;
+    long offset;
+    for (int i = 0; i < TUPLE_NUM; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+
+      offset = appender.getOffset();
+      appender.addTuple(tuple);
+      creater.write(tuple, offset);
+    }
+    appender.flush();
+    appender.close();
+
+    creater.flush();
+    creater.close();
+
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    tuple = new VTuple(keySchema.size());
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
+      tuple.put(0, DatumFactory.createInt8(i));
+      tuple.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(tuple);
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8()));
+      assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8()));
+
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
+    }
+    reader.close();
+    scanner.close();
+  }
+
+  @Test
+  public void testFindOmittedValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+    for (int i = 0; i < TUPLE_NUM; i += 2) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen());
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(1));
+      keyTuple.put(1, tuple.get(2));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+    for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
+      keyTuple.put(0, DatumFactory.createInt8(i));
+      keyTuple.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(keyTuple);
+      assertEquals(-1, offsets);
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testFindNextKeyValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+    for (int i = 0; i < TUPLE_NUM; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(0));
+      keyTuple.put(1, tuple.get(1));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+    scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple result;
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
+      keyTuple = new VTuple(2);
+      keyTuple.put(0, DatumFactory.createInt4(i));
+      keyTuple.put(1, DatumFactory.createInt8(i));
+      long offsets = reader.find(keyTuple, true);
+      scanner.seek(offsets);
+      result = scanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]",
+          (i + 1) == (result.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
+
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      scanner.seek(offsets);
+      result = scanner.next();
+      assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8()));
+      assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8()));
+    }
+    reader.close();
+    scanner.close();
+  }
+
+  @Test
+  public void testFindNextKeyOmittedValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+    for (int i = 0; i < TUPLE_NUM; i += 2) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(0));
+      keyTuple.put(1, tuple.get(1));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+    scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple result;
+    for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
+      keyTuple = new VTuple(2);
+      keyTuple.put(0, DatumFactory.createInt4(i));
+      keyTuple.put(1, DatumFactory.createInt8(i));
+      long offsets = reader.find(keyTuple, true);
+      scanner.seek(offsets);
+      result = scanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8()));
+    }
+    scanner.close();
+  }
+
+  @Test
+  public void testFindMinValue() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+
+    Tuple tuple;
+    for (int i = 5; i < TUPLE_NUM + 5; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(1));
+      keyTuple.put(1, tuple.get(2));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    tuple = new VTuple(keySchema.size());
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+    scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    tuple.put(0, DatumFactory.createInt8(0));
+    tuple.put(1, DatumFactory.createFloat8(0));
+
+    offset = reader.find(tuple);
+    assertEquals(-1, offset);
+
+    offset = reader.find(tuple, true);
+    assertTrue(offset >= 0);
+    scanner.seek(offset);
+    tuple = scanner.next();
+    assertEquals(5, tuple.get(1).asInt4());
+    assertEquals(5l, tuple.get(2).asInt8());
+    reader.close();
+    scanner.close();
+  }
+
+  @Test
+  public void testMinMax() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testMinMax_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+    for (int i = 5; i < TUPLE_NUM; i += 2) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(0));
+      keyTuple.put(1, tuple.get(1));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+
+    Tuple min = reader.getFirstKey();
+    assertEquals(5, min.get(0).asInt4());
+    assertEquals(5l, min.get(0).asInt8());
+
+    Tuple max = reader.getLastKey();
+    assertEquals(TUPLE_NUM - 1, max.get(0).asInt4());
+    assertEquals(TUPLE_NUM - 1, max.get(0).asInt8());
+    reader.close();
+  }
+
+  private class ConcurrentAccessor implements Runnable {
+    final BSTIndexReader reader;
+    final Random rnd = new Random(System.currentTimeMillis());
+    boolean failed = false;
+
+    ConcurrentAccessor(BSTIndexReader reader) {
+      this.reader = reader;
+    }
+
+    public boolean isFailed() {
+      return this.failed;
+    }
+
+    @Override
+    public void run() {
+      Tuple findKey = new VTuple(2);
+      int keyVal;
+      for (int i = 0; i < 10000; i++) {
+        keyVal = rnd.nextInt(10000);
+        findKey.put(0, DatumFactory.createInt4(keyVal));
+        findKey.put(1, DatumFactory.createInt8(keyVal));
+        try {
+          assertTrue(reader.find(findKey) != -1);
+        } catch (Exception e) {
+          e.printStackTrace();
+          this.failed = true;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentAccess() throws IOException, InterruptedException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+
+    Tuple tuple;
+    for (int i = 0; i < TUPLE_NUM; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(0));
+      keyTuple.put(1, tuple.get(1));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+
+    Thread[] threads = new Thread[5];
+    ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
+    for (int i = 0; i < threads.length; i++) {
+      accs[i] = new ConcurrentAccessor(reader);
+      threads[i] = new Thread(accs[i]);
+      threads[i].start();
+    }
+
+    for (int i = 0; i < threads.length; i++) {
+      threads[i].join();
+      assertFalse(accs[i].isFailed());
+    }
+    reader.close();
+  }
+
+
+  @Test
+  public void testFindValueDescOrder() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+
+    Tuple tuple;
+    for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(1));
+      keyTuple.put(1, tuple.get(2));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    tuple = new VTuple(keySchema.size());
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+    scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    for (int i = (TUPLE_NUM - 1); i > 0; i--) {
+      tuple.put(0, DatumFactory.createInt8(i));
+      tuple.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(tuple);
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
+      assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4()));
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8()));
+    }
+    reader.close();
+    scanner.close();
+  }
+
+  @Test
+  public void testFindNextKeyValueDescOrder() throws IOException {
+    meta = CatalogUtil.newTableMeta(storeType);
+
+    Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+
+    Tuple tuple;
+    for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
+        "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.get(0));
+      keyTuple.put(1, tuple.get(1));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"),
+        keySchema, comp);
+    reader.open();
+
+    assertEquals(keySchema, reader.getKeySchema());
+    assertEquals(comp, reader.getComparator());
+
+    scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner.init();
+
+    Tuple result;
+    for (int i = (TUPLE_NUM - 1); i > 0; i--) {
+      keyTuple = new VTuple(2);
+      keyTuple.put(0, DatumFactory.createInt4(i));
+      keyTuple.put(1, DatumFactory.createInt8(i));
+      long offsets = reader.find(keyTuple, true);
+      scanner.seek(offsets);
+      result = scanner.next();
+      assertTrue("[seek check " + (i - 1) + " ]",
+          (i - 1) == (result.get(0).asInt4()));
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8()));
+
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      scanner.seek(offsets);
+      result = scanner.next();
+      assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8()));
+      assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8()));
+    }
+    reader.close();
+    scanner.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
new file mode 100644
index 0000000..d7c9f49
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.storage.CSVFile.CSVScanner;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSingleCSVFileBSTIndex {
+  
+  private TajoConf conf;
+  private Schema schema;
+  private TableMeta meta;
+  private FileSystem fs;
+
+  private static final int TUPLE_NUM = 10000;
+  private static final int LOAD_NUM = 100;
+  private static final String TEST_PATH = "target/test-data/TestSingleCSVFileBSTIndex";
+  private Path testDir;
+  
+  public TestSingleCSVFileBSTIndex() {
+    conf = new TajoConf();
+    conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
+    schema = new Schema();
+    schema.addColumn(new Column("int", Type.INT4));
+    schema.addColumn(new Column("long", Type.INT8));
+    schema.addColumn(new Column("double", Type.FLOAT8));
+    schema.addColumn(new Column("float", Type.FLOAT4));
+    schema.addColumn(new Column("string", Type.TEXT));
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Test
+  public void testFindValueInSingleCSV() throws IOException {
+    meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+    Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
+    fs.mkdirs(tablePath.getParent());
+
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+    for (int i = 0; i < TUPLE_NUM; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
+        "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+
+    SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet);
+    fileScanner.init();
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = fileScanner.getNextOffset();
+      tuple = fileScanner.next();
+      if (tuple == null)
+        break;
+
+      keyTuple.put(0, tuple.get(1));
+      keyTuple.put(1, tuple.get(2));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    fileScanner.close();
+
+    tuple = new VTuple(keySchema.size());
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir,
+        "FindValueInCSV.idx"), keySchema, comp);
+    reader.open();
+    fileScanner = new CSVScanner(conf, schema, meta, tablet);
+    fileScanner.init();
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
+      tuple.put(0, DatumFactory.createInt8(i));
+      tuple.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(tuple);
+      fileScanner.seek(offsets);
+      tuple = fileScanner.next();
+      assertEquals(i,  (tuple.get(1).asInt8()));
+      assertEquals(i, (tuple.get(2).asFloat8()) , 0.01);
+
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      fileScanner.seek(offsets);
+      tuple = fileScanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]",
+          (i + 1) == (tuple.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]",
+          (i + 1) == (tuple.get(1).asInt8()));
+    }
+  }
+
+  @Test
+  public void testFindNextKeyValueInSingleCSV() throws IOException {
+    meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+    Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
+        "table1.csv");
+    fs.mkdirs(tablePath.getParent());
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+    for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+    
+    SortSpec [] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("int", Type.INT4));
+    keySchema.addColumn(new Column("long", Type.INT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+    
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    creater.setLoadNum(LOAD_NUM);
+    creater.open();
+    
+    SeekableScanner fileScanner  = new CSVScanner(conf, schema, meta, tablet);
+    fileScanner.init();
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = fileScanner.getNextOffset();
+      tuple = fileScanner.next();
+      if (tuple == null) break;
+      
+      keyTuple.put(0, tuple.get(0));
+      keyTuple.put(1, tuple.get(1));
+      creater.write(keyTuple, offset);
+    }
+    
+    creater.flush();
+    creater.close();
+    fileScanner.close();    
+    
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
+    reader.open();
+    fileScanner  = new CSVScanner(conf, schema, meta, tablet);
+    fileScanner.init();
+    Tuple result;
+    for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
+      keyTuple = new VTuple(2);
+      keyTuple.put(0, DatumFactory.createInt4(i));
+      keyTuple.put(1, DatumFactory.createInt8(i));
+      long offsets = reader.find(keyTuple, true);
+      fileScanner.seek(offsets);
+      result = fileScanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8()));
+      
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      fileScanner.seek(offsets);
+      result = fileScanner.next();
+      assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8()));
+      assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
new file mode 100644
index 0000000..70282d9
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+public class TestJsonSerDe {
+  private static Schema schema = new Schema();
+
+  static {
+    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+    schema.addColumn("col2", TajoDataTypes.Type.CHAR, 7);
+    schema.addColumn("col3", TajoDataTypes.Type.INT2);
+    schema.addColumn("col4", TajoDataTypes.Type.INT4);
+    schema.addColumn("col5", TajoDataTypes.Type.INT8);
+    schema.addColumn("col6", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col7", TajoDataTypes.Type.FLOAT8);
+    schema.addColumn("col8", TajoDataTypes.Type.TEXT);
+    schema.addColumn("col9", TajoDataTypes.Type.BLOB);
+    schema.addColumn("col10", TajoDataTypes.Type.INET4);
+    schema.addColumn("col11", TajoDataTypes.Type.NULL_TYPE);
+  }
+
+  public static Path getResourcePath(String path, String suffix) {
+    URL resultBaseURL = ClassLoader.getSystemResource(path);
+    return new Path(resultBaseURL.toString(), suffix);
+  }
+
+  @Test
+  public void testVarioutType() throws IOException {
+    TajoConf conf = new TajoConf();
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), "testVariousType.json");
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple tuple = scanner.next();
+    assertNotNull(tuple);
+    assertNull(scanner.next());
+    scanner.close();
+
+    Tuple baseTuple = new VTuple(11);
+    baseTuple.put(new Datum[] {
+        DatumFactory.createBool(true),                  // 0
+        DatumFactory.createChar("hyunsik"),             // 1
+        DatumFactory.createInt2((short) 17),            // 2
+        DatumFactory.createInt4(59),                    // 3
+        DatumFactory.createInt8(23l),                   // 4
+        DatumFactory.createFloat4(77.9f),               // 5
+        DatumFactory.createFloat8(271.9d),              // 6
+        DatumFactory.createText("hyunsik"),             // 7
+        DatumFactory.createBlob("hyunsik".getBytes()),  // 8
+        DatumFactory.createInet4("192.168.0.1"),        // 9
+        NullDatum.get(),                                // 10
+    });
+
+    assertEquals(baseTuple, tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
new file mode 100644
index 0000000..109fed9
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import com.google.common.base.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestReadWrite {
+  private static final String HELLO = "hello";
+
+  private Path createTmpFile() throws IOException {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+
+    // it prevents accessing HDFS namenode of TajoTestingCluster.
+    LocalFileSystem localFS = LocalFileSystem.getLocal(new Configuration());
+    return localFS.makeQualified(new Path(tmp.getPath()));
+  }
+
+  private Schema createAllTypesSchema() {
+    List<Column> columns = new ArrayList<Column>();
+    columns.add(new Column("myboolean", Type.BOOLEAN));
+    columns.add(new Column("mybit", Type.BIT));
+    columns.add(new Column("mychar", Type.CHAR));
+    columns.add(new Column("myint2", Type.INT2));
+    columns.add(new Column("myint4", Type.INT4));
+    columns.add(new Column("myint8", Type.INT8));
+    columns.add(new Column("myfloat4", Type.FLOAT4));
+    columns.add(new Column("myfloat8", Type.FLOAT8));
+    columns.add(new Column("mytext", Type.TEXT));
+    columns.add(new Column("myblob", Type.BLOB));
+    columns.add(new Column("mynull", Type.NULL_TYPE));
+    Column[] columnsArray = new Column[columns.size()];
+    columnsArray = columns.toArray(columnsArray);
+    return new Schema(columnsArray);
+  }
+
+  @Test
+  public void testAll() throws Exception {
+    Path file = createTmpFile();
+    Schema schema = createAllTypesSchema();
+    Tuple tuple = new VTuple(schema.size());
+    tuple.put(0, DatumFactory.createBool(true));
+    tuple.put(1, DatumFactory.createBit((byte)128));
+    tuple.put(2, DatumFactory.createChar('t'));
+    tuple.put(3, DatumFactory.createInt2((short)2048));
+    tuple.put(4, DatumFactory.createInt4(4096));
+    tuple.put(5, DatumFactory.createInt8(8192L));
+    tuple.put(6, DatumFactory.createFloat4(0.2f));
+    tuple.put(7, DatumFactory.createFloat8(4.1));
+    tuple.put(8, DatumFactory.createText(HELLO));
+    tuple.put(9, DatumFactory.createBlob(HELLO.getBytes(Charsets.UTF_8)));
+    tuple.put(10, NullDatum.get());
+
+    TajoParquetWriter writer = new TajoParquetWriter(file, schema);
+    writer.write(tuple);
+    writer.close();
+
+    TajoParquetReader reader = new TajoParquetReader(file, schema);
+    tuple = reader.read();
+
+    assertNotNull(tuple);
+    assertEquals(true, tuple.getBool(0));
+    assertEquals((byte)128, tuple.getByte(1));
+    assertTrue(String.valueOf('t').equals(String.valueOf(tuple.getChar(2))));
+    assertEquals((short)2048, tuple.getInt2(3));
+    assertEquals(4096, tuple.getInt4(4));
+    assertEquals(8192L, tuple.getInt8(5));
+    assertEquals(new Float(0.2f), new Float(tuple.getFloat4(6)));
+    assertEquals(new Double(4.1), new Double(tuple.getFloat8(7)));
+    assertTrue(HELLO.equals(tuple.getText(8)));
+    assertArrayEquals(HELLO.getBytes(Charsets.UTF_8), tuple.getBytes(9));
+    assertEquals(NullDatum.get(), tuple.get(10));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
new file mode 100644
index 0000000..517e00e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.junit.Test;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TajoSchemaConverter}.
+ */
+public class TestSchemaConverter {
+  private static final String ALL_PARQUET_SCHEMA =
+      "message table_schema {\n" +
+      "  optional boolean myboolean;\n" +
+      "  optional int32 myint;\n" +
+      "  optional int64 mylong;\n" +
+      "  optional float myfloat;\n" +
+      "  optional double mydouble;\n" +
+      "  optional binary mybytes;\n" +
+      "  optional binary mystring (UTF8);\n" +
+      "  optional fixed_len_byte_array(1) myfixed;\n" +
+      "}\n";
+
+  private static final String CONVERTED_ALL_PARQUET_SCHEMA =
+      "message table_schema {\n" +
+      "  optional boolean myboolean;\n" +
+      "  optional int32 mybit;\n" +
+      "  optional binary mychar (UTF8);\n" +
+      "  optional int32 myint2;\n" +
+      "  optional int32 myint4;\n" +
+      "  optional int64 myint8;\n" +
+      "  optional float myfloat4;\n" +
+      "  optional double myfloat8;\n" +
+      "  optional binary mytext (UTF8);\n" +
+      "  optional binary myblob;\n" +
+      // NULL_TYPE fields are not encoded.
+      "  optional binary myinet4;\n" +
+      "  optional binary myprotobuf;\n" +
+      "}\n";
+
+  private Schema createAllTypesSchema() {
+    List<Column> columns = new ArrayList<Column>();
+    columns.add(new Column("myboolean", Type.BOOLEAN));
+    columns.add(new Column("mybit", Type.BIT));
+    columns.add(new Column("mychar", Type.CHAR));
+    columns.add(new Column("myint2", Type.INT2));
+    columns.add(new Column("myint4", Type.INT4));
+    columns.add(new Column("myint8", Type.INT8));
+    columns.add(new Column("myfloat4", Type.FLOAT4));
+    columns.add(new Column("myfloat8", Type.FLOAT8));
+    columns.add(new Column("mytext", Type.TEXT));
+    columns.add(new Column("myblob", Type.BLOB));
+    columns.add(new Column("mynull", Type.NULL_TYPE));
+    columns.add(new Column("myinet4", Type.INET4));
+    columns.add(new Column("myprotobuf", Type.PROTOBUF));
+    Column[] columnsArray = new Column[columns.size()];
+    columnsArray = columns.toArray(columnsArray);
+    return new Schema(columnsArray);
+  }
+
+  private Schema createAllTypesConvertedSchema() {
+    List<Column> columns = new ArrayList<Column>();
+    columns.add(new Column("myboolean", Type.BOOLEAN));
+    columns.add(new Column("myint", Type.INT4));
+    columns.add(new Column("mylong", Type.INT8));
+    columns.add(new Column("myfloat", Type.FLOAT4));
+    columns.add(new Column("mydouble", Type.FLOAT8));
+    columns.add(new Column("mybytes", Type.BLOB));
+    columns.add(new Column("mystring", Type.TEXT));
+    columns.add(new Column("myfixed", Type.BLOB));
+    Column[] columnsArray = new Column[columns.size()];
+    columnsArray = columns.toArray(columnsArray);
+    return new Schema(columnsArray);
+  }
+
+  private void testTajoToParquetConversion(
+      Schema tajoSchema, String schemaString) throws Exception {
+    TajoSchemaConverter converter = new TajoSchemaConverter();
+    MessageType schema = converter.convert(tajoSchema);
+    MessageType expected = MessageTypeParser.parseMessageType(schemaString);
+    assertEquals("converting " + schema + " to " + schemaString,
+                 expected.toString(), schema.toString());
+  }
+
+  private void testParquetToTajoConversion(
+      Schema tajoSchema, String schemaString) throws Exception {
+    TajoSchemaConverter converter = new TajoSchemaConverter();
+    Schema schema = converter.convert(
+        MessageTypeParser.parseMessageType(schemaString));
+    assertEquals("converting " + schemaString + " to " + tajoSchema,
+                 tajoSchema.toString(), schema.toString());
+  }
+
+  @Test
+  public void testAllTypesToParquet() throws Exception {
+    Schema schema = createAllTypesSchema();
+    testTajoToParquetConversion(schema, CONVERTED_ALL_PARQUET_SCHEMA);
+  }
+
+  @Test
+  public void testAllTypesToTajo() throws Exception {
+    Schema schema = createAllTypesConvertedSchema();
+    testParquetToTajoConversion(schema, ALL_PARQUET_SCHEMA);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
new file mode 100644
index 0000000..739dfe7
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
@@ -0,0 +1,6 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
new file mode 100644
index 0000000..8256b72
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
@@ -0,0 +1,4 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
new file mode 100644
index 0000000..8ee3408
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
@@ -0,0 +1 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
new file mode 100644
index 0000000..7403c26
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
@@ -0,0 +1,2 @@
+1|25|emiya muljomdao
+2|25|emiya muljomdao
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
new file mode 100644
index 0000000..d4250a9
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
@@ -0,0 +1,20 @@
+{
+  "type": "record",
+  "namespace": "org.apache.tajo",
+  "name": "testVariousTypes",
+  "fields": [
+    { "name": "col1", "type": "boolean" },
+    { "name": "col2", "type": "string" },
+    { "name": "col3", "type": "int" },
+    { "name": "col4", "type": "int" },
+    { "name": "col5", "type": "long" },
+    { "name": "col6", "type": "float" },
+    { "name": "col7", "type": "double" },
+    { "name": "col8", "type": "string" },
+    { "name": "col9", "type": "bytes" },
+    { "name": "col10", "type": "bytes" },
+    { "name": "col11", "type": "null" },
+    { "name": "col12", "type": "bytes" }
+  ]
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
new file mode 100644
index 0000000..737284b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -0,0 +1,178 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+  <property>
+    <name>fs.s3.impl</name>
+    <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+  </property>
+
+  <!-- Storage Manager Configuration -->
+  <property>
+    <name>tajo.storage.manager.hdfs.class</name>
+    <value>org.apache.tajo.storage.FileStorageManager</value>
+  </property>
+  <property>
+    <name>tajo.storage.manager.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+  </property>
+
+  <!--- Registered Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler</name>
+    <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
+  </property>
+
+  <!--- Fragment Class Configurations -->
+  <property>
+    <name>tajo.storage.fragment.textfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.csv.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.json.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.raw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.rcfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.row.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.parquet.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.sequencefile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+
+  <!--- Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <!--- Appender Handler -->
+  <property>
+    <name>tajo.storage.appender-handler</name>
+    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
+</configuration>