You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:36 UTC

[02/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
new file mode 100644
index 0000000..07e8dd7
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.text.ByteBufLineReader;
+import org.apache.tajo.storage.text.DelimitedLineReader;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+public class TestLineReader {
+	private static String TEST_PATH = "target/test-data/TestLineReader";
+
+  @Test
+  public void testByteBufLineReader() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    FileSystem fs = testDir.getFileSystem(conf);
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("comment", Type.TEXT);
+    schema.addColumn("comment2", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+    Path tablePath = new Path(testDir, "line.data");
+    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+        null, null, meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+      vTuple.put(3, NullDatum.get());
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+
+    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
+    ByteBufLineReader reader = new ByteBufLineReader(channel);
+
+    long totalRead = 0;
+    int i = 0;
+    AtomicInteger bytes = new AtomicInteger();
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      totalRead += bytes.get();
+      if(buf == null) break;
+      i++;
+    }
+    IOUtils.cleanup(null, reader, channel, fs);
+    assertEquals(tupleNum, i);
+    assertEquals(status.getLen(), totalRead);
+    assertEquals(status.getLen(), reader.readBytes());
+  }
+
+  @Test
+  public void testLineDelimitedReader() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    FileSystem fs = testDir.getFileSystem(conf);
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("comment", Type.TEXT);
+    schema.addColumn("comment2", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+    meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
+
+    Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
+    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+        null, null, meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    long splitOffset = 0;
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+      vTuple.put(3, NullDatum.get());
+      appender.addTuple(vTuple);
+
+      if(i == (tupleNum / 2)){
+        splitOffset = appender.getOffset();
+      }
+    }
+    String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
+    appender.close();
+
+    tablePath = tablePath.suffix(extension);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
+    DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
+    assertTrue(reader.isCompressed());
+    assertFalse(reader.isReadable());
+    reader.init();
+    assertTrue(reader.isReadable());
+
+
+    int i = 0;
+    while(reader.isReadable()){
+      ByteBuf buf = reader.readLine();
+      if(buf == null) break;
+      i++;
+    }
+
+    IOUtils.cleanup(null, reader, fs);
+    assertEquals(tupleNum, i);
+
+  }
+
+  @Test
+  public void testByteBufLineReaderWithoutTerminating() throws IOException {
+    String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
+    File file = new File(path);
+    String data = FileUtil.readTextFile(file);
+
+    ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
+    ByteBufLineReader reader = new ByteBufLineReader(channel);
+
+    long totalRead = 0;
+    int i = 0;
+    AtomicInteger bytes = new AtomicInteger();
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      totalRead += bytes.get();
+      if(buf == null) break;
+      i++;
+    }
+    IOUtils.cleanup(null, reader);
+    assertEquals(file.length(), totalRead);
+    assertEquals(file.length(), reader.readBytes());
+    assertEquals(data.split("\n").length, i);
+  }
+
+  @Test
+  public void testCRLFLine() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt");
+
+    FileSystem fs = testFile.getFileSystem(conf);
+    FSDataOutputStream outputStream = fs.create(testFile, true);
+    outputStream.write("0\r\n1\r\n".getBytes());
+    outputStream.flush();
+    IOUtils.closeStream(outputStream);
+
+    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile));
+    ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2));
+    FileStatus status = fs.getFileStatus(testFile);
+
+    long totalRead = 0;
+    int i = 0;
+    AtomicInteger bytes = new AtomicInteger();
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      totalRead += bytes.get();
+      if(buf == null) break;
+      String row  = buf.toString(Charset.defaultCharset());
+      assertEquals(i, Integer.parseInt(row));
+      i++;
+    }
+    IOUtils.cleanup(null, reader);
+    assertEquals(status.getLen(), totalRead);
+    assertEquals(status.getLen(), reader.readBytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
new file mode 100644
index 0000000..a0daa7d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestMergeScanner {
+  private TajoConf conf;
+  StorageManager sm;
+  private static String TEST_PATH = "target/test-data/TestMergeScanner";
+
+  private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testMultipleFiles\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"file\", \"type\": \"string\" },\n" +
+      "    { \"name\": \"name\", \"type\": \"string\" },\n" +
+      "    { \"name\": \"age\", \"type\": \"long\" }\n" +
+      "  ]\n" +
+      "}\n";
+
+  private Path testDir;
+  private StoreType storeType;
+  private FileSystem fs;
+
+  public TestMergeScanner(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        {StoreType.CSV},
+        {StoreType.RAW},
+        {StoreType.RCFILE},
+        {StoreType.PARQUET},
+        {StoreType.SEQUENCEFILE},
+        {StoreType.AVRO},
+        // RowFile requires Byte-buffer read support, so we omitted RowFile.
+        //{StoreType.ROWFILE},
+    });
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new TajoConf();
+    conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
+    conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
+  }
+
+  @Test
+  public void testMultipleFiles() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("file", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    schema.addColumn("age", Type.INT8);
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_MULTIPLE_FILES_AVRO_SCHEMA);
+    }
+
+    Path table1Path = new Path(testDir, storeType + "_1.data");
+    Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
+    appender1.enableStats();
+    appender1.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for(int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createText("hyunsik"));
+      vTuple.put(2, DatumFactory.createText("jihoon"));
+      vTuple.put(3, DatumFactory.createInt8(25l));
+      appender1.addTuple(vTuple);
+    }
+    appender1.close();
+
+    TableStats stat1 = appender1.getStats();
+    if (stat1 != null) {
+      assertEquals(tupleNum, stat1.getNumRows().longValue());
+    }
+
+    Path table2Path = new Path(testDir, storeType + "_2.data");
+    Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
+    appender2.enableStats();
+    appender2.init();
+
+    for(int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createText("hyunsik"));
+      vTuple.put(2, DatumFactory.createText("jihoon"));
+      vTuple.put(3, DatumFactory.createInt8(25l));
+      appender2.addTuple(vTuple);
+    }
+    appender2.close();
+
+    TableStats stat2 = appender2.getStats();
+    if (stat2 != null) {
+      assertEquals(tupleNum, stat2.getNumRows().longValue());
+    }
+
+
+    FileStatus status1 = fs.getFileStatus(table1Path);
+    FileStatus status2 = fs.getFileStatus(table2Path);
+    Fragment[] fragment = new Fragment[2];
+    fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
+    fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
+
+    Schema targetSchema = new Schema();
+    targetSchema.addColumn(schema.getColumn(0));
+    targetSchema.addColumn(schema.getColumn(2));
+
+    Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema);
+    assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
+
+    scanner.init();
+    int totalCounts = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      totalCounts++;
+      if (isProjectableStorage(meta.getStoreType())) {
+        assertNotNull(tuple.get(0));
+        assertNull(tuple.get(1));
+        assertNotNull(tuple.get(2));
+        assertNull(tuple.get(3));
+      }
+    }
+    scanner.close();
+
+    assertEquals(tupleNum * 2, totalCounts);
+	}
+
+  private static boolean isProjectableStorage(StoreType type) {
+    switch (type) {
+      case RCFILE:
+      case PARQUET:
+      case SEQUENCEFILE:
+      case CSV:
+      case AVRO:
+        return true;
+      default:
+        return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
new file mode 100644
index 0000000..12ea551
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.CharsetUtil;
+import org.apache.tajo.storage.text.FieldSplitProcessor;
+import org.apache.tajo.storage.text.LineSplitProcessor;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static io.netty.util.ReferenceCountUtil.releaseLater;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSplitProcessor {
+
+  @Test
+  public void testFieldSplitProcessor() throws IOException {
+    String data = "abc||de";
+    final ByteBuf buf = releaseLater(
+        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+    final int len = buf.readableBytes();
+    FieldSplitProcessor processor = new FieldSplitProcessor('|');
+
+    assertEquals(3, buf.forEachByte(0, len, processor));
+    assertEquals(4, buf.forEachByte(4, len - 4, processor));
+    assertEquals(-1, buf.forEachByte(5, len - 5, processor));
+
+  }
+
+  @Test
+  public void testLineSplitProcessor() throws IOException {
+    String data = "abc\r\n\n";
+    final ByteBuf buf = releaseLater(
+        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+    final int len = buf.readableBytes();
+    LineSplitProcessor processor = new LineSplitProcessor();
+
+    //find CR
+    assertEquals(3, buf.forEachByte(0, len, processor));
+
+    // find CRLF
+    assertEquals(4, buf.forEachByte(4, len - 4, processor));
+    assertEquals(buf.getByte(4), '\n');
+    // need to skip LF
+    assertTrue(processor.isPrevCharCR());
+
+    // find LF
+    assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
new file mode 100644
index 0000000..15998f2
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -0,0 +1,878 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestStorages {
+	private TajoConf conf;
+	private static String TEST_PATH = "target/test-data/TestStorages";
+
+  private static String TEST_PROJECTION_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testProjection\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"age\", \"type\": \"long\" },\n" +
+      "    { \"name\": \"score\", \"type\": \"float\" }\n" +
+      "  ]\n" +
+      "}\n";
+
+  private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testNullHandlingTypes\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
+      "    { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" +
+      "    { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" +
+      "    { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" +
+      "    { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" +
+      "    { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
+      "    { \"name\": \"col11\", \"type\": \"null\" },\n" +
+      "    { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" +
+      "  ]\n" +
+      "}\n";
+
+  private StoreType storeType;
+  private boolean splitable;
+  private boolean statsable;
+  private boolean seekable;
+  private Path testDir;
+  private FileSystem fs;
+
+  public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException {
+    this.storeType = type;
+    this.splitable = splitable;
+    this.statsable = statsable;
+    this.seekable = seekable;
+
+    conf = new TajoConf();
+
+    if (storeType == StoreType.RCFILE) {
+      conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+    }
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        //type, splitable, statsable, seekable
+        {StoreType.CSV, true, true, true},
+        {StoreType.RAW, false, true, true},
+        {StoreType.RCFILE, true, true, false},
+        {StoreType.PARQUET, false, false, false},
+        {StoreType.SEQUENCEFILE, true, true, false},
+        {StoreType.AVRO, false, false, false},
+        {StoreType.TEXTFILE, true, true, false},
+        {StoreType.JSON, true, true, false},
+    });
+  }
+
+	@Test
+  public void testSplitable() throws IOException {
+    if (splitable) {
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT8);
+
+      TableMeta meta = CatalogUtil.newTableMeta(storeType);
+      Path tablePath = new Path(testDir, "Splitable.data");
+      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+      Appender appender = sm.getAppender(meta, schema, tablePath);
+      appender.enableStats();
+      appender.init();
+      int tupleNum = 10000;
+      VTuple vTuple;
+
+      for (int i = 0; i < tupleNum; i++) {
+        vTuple = new VTuple(2);
+        vTuple.put(0, DatumFactory.createInt4(i + 1));
+        vTuple.put(1, DatumFactory.createInt8(25l));
+        appender.addTuple(vTuple);
+      }
+      appender.close();
+      TableStats stat = appender.getStats();
+      assertEquals(tupleNum, stat.getNumRows().longValue());
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      long fileLen = status.getLen();
+      long randomNum = (long) (Math.random() * fileLen) + 1;
+
+      FileFragment[] tablets = new FileFragment[2];
+      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+      Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      int tupleCnt = 0;
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      scanner = sm.getScanner(meta, schema, tablets[1], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      assertEquals(tupleNum, tupleCnt);
+    }
+	}
+
+  @Test
+  public void testRCFileSplitable() throws IOException {
+    if (storeType == StoreType.RCFILE) {
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT8);
+
+      TableMeta meta = CatalogUtil.newTableMeta(storeType);
+      Path tablePath = new Path(testDir, "Splitable.data");
+      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+      Appender appender = sm.getAppender(meta, schema, tablePath);
+      appender.enableStats();
+      appender.init();
+      int tupleNum = 10000;
+      VTuple vTuple;
+
+      for (int i = 0; i < tupleNum; i++) {
+        vTuple = new VTuple(2);
+        vTuple.put(0, DatumFactory.createInt4(i + 1));
+        vTuple.put(1, DatumFactory.createInt8(25l));
+        appender.addTuple(vTuple);
+      }
+      appender.close();
+      TableStats stat = appender.getStats();
+      assertEquals(tupleNum, stat.getNumRows().longValue());
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      long fileLen = status.getLen();
+      long randomNum = 122; // header size
+
+      FileFragment[] tablets = new FileFragment[2];
+      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+      Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      int tupleCnt = 0;
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      scanner = sm.getScanner(meta, schema, tablets[1], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      assertEquals(tupleNum, tupleCnt);
+    }
+  }
+
+  @Test
+  public void testProjection() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("score", Type.FLOAT4);
+
+    TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_PROJECTION_AVRO_SCHEMA);
+    }
+
+    Path tablePath = new Path(testDir, "testProjection.data");
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(3);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(i + 2));
+      vTuple.put(2, DatumFactory.createFloat4(i + 3));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
+
+    Schema target = new Schema();
+    target.addColumn("age", Type.INT8);
+    target.addColumn("score", Type.FLOAT4);
+    Scanner scanner = sm.getScanner(meta, schema, fragment, target);
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      if (storeType == StoreType.RCFILE
+          || storeType == StoreType.CSV
+          || storeType == StoreType.PARQUET
+          || storeType == StoreType.SEQUENCEFILE
+          || storeType == StoreType.AVRO) {
+        assertTrue(tuple.get(0) == null);
+      }
+      assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
+      assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
+      tupleCnt++;
+    }
+    scanner.close();
+
+    assertEquals(tupleNum, tupleCnt);
+  }
+
+  @Test
+  public void testVariousTypes() throws IOException {
+    boolean handleProtobuf = storeType != StoreType.JSON;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    if (handleProtobuf) {
+      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    }
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
+      meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
+    }
+
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0));
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createChar("hyunsik"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get()
+    });
+    if (handleProtobuf) {
+      tuple.put(11, factory.createDatum(queryid.getProto()));
+    }
+
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  sm.getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved = scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+    scanner.close();
+  }
+
+  @Test
+  public void testNullHandlingTypes() throws IOException {
+    boolean handleProtobuf = storeType != StoreType.JSON;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+
+    if (handleProtobuf) {
+      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    }
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+    meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
+    meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
+    meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
+    meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+    }
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+    int columnNum = 11 + (handleProtobuf ? 1 : 0);
+    Tuple seedTuple = new VTuple(columnNum);
+    seedTuple.put(new Datum[]{
+        DatumFactory.createBool(true),                // 0
+        DatumFactory.createChar("hyunsik"),           // 2
+        DatumFactory.createInt2((short) 17),          // 3
+        DatumFactory.createInt4(59),                  // 4
+        DatumFactory.createInt8(23l),                 // 5
+        DatumFactory.createFloat4(77.9f),             // 6
+        DatumFactory.createFloat8(271.9f),            // 7
+        DatumFactory.createText("hyunsik"),           // 8
+        DatumFactory.createBlob("hyunsik".getBytes()),// 9
+        DatumFactory.createInet4("192.168.0.1"),      // 10
+        NullDatum.get(),                              // 11
+    });
+
+    if (handleProtobuf) {
+      seedTuple.put(11, factory.createDatum(queryid.getProto()));       // 12
+    }
+
+    // Making tuples with different null column positions
+    Tuple tuple;
+    for (int i = 0; i < columnNum; i++) {
+      tuple = new VTuple(columnNum);
+      for (int j = 0; j < columnNum; j++) {
+        if (i == j) { // i'th column will have NULL value
+          tuple.put(j, NullDatum.get());
+        } else {
+          tuple.put(j, seedTuple.get(j));
+        }
+      }
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    int i = 0;
+    while ((retrieved = scanner.next()) != null) {
+      assertEquals(columnNum, retrieved.size());
+      for (int j = 0; j < columnNum; j++) {
+        if (i == j) {
+          assertEquals(NullDatum.get(), retrieved.get(j));
+        } else {
+          assertEquals(seedTuple.get(j), retrieved.get(j));
+        }
+      }
+
+      i++;
+    }
+    scanner.close();
+  }
+
+  @Test
+  public void testRCFileTextSerializeDeserialize() throws IOException {
+    if(storeType != StoreType.RCFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+  }
+
+  @Test
+  public void testRCFileBinarySerializeDeserialize() throws IOException {
+    if(storeType != StoreType.RCFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+  }
+
+  @Test
+  public void testSequenceFileTextSerializeDeserialize() throws IOException {
+    if(storeType != StoreType.SEQUENCEFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    assertTrue(scanner instanceof SequenceFileScanner);
+    Writable key = ((SequenceFileScanner) scanner).getKey();
+    assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+  }
+
+  @Test
+  public void testSequenceFileBinarySerializeDeserialize() throws IOException {
+    if(storeType != StoreType.SEQUENCEFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    assertTrue(scanner instanceof SequenceFileScanner);
+    Writable key = ((SequenceFileScanner) scanner).getKey();
+    assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+  }
+
+  @Test
+  public void testTime() throws IOException {
+    if (storeType == StoreType.CSV || storeType == StoreType.RAW) {
+      Schema schema = new Schema();
+      schema.addColumn("col1", Type.DATE);
+      schema.addColumn("col2", Type.TIME);
+      schema.addColumn("col3", Type.TIMESTAMP);
+
+      KeyValueSet options = new KeyValueSet();
+      TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+      Path tablePath = new Path(testDir, "testTime.data");
+      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+      Appender appender = sm.getAppender(meta, schema, tablePath);
+      appender.init();
+
+      Tuple tuple = new VTuple(3);
+      tuple.put(new Datum[]{
+          DatumFactory.createDate("1980-04-01"),
+          DatumFactory.createTime("12:34:56"),
+          DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000))
+      });
+      appender.addTuple(tuple);
+      appender.flush();
+      appender.close();
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+      Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+      scanner.init();
+
+      Tuple retrieved;
+      while ((retrieved = scanner.next()) != null) {
+        for (int i = 0; i < tuple.size(); i++) {
+          assertEquals(tuple.get(i), retrieved.get(i));
+        }
+      }
+      scanner.close();
+    }
+  }
+
+  @Test
+  public void testSeekableScanner() throws IOException {
+    if (!seekable) {
+      return;
+    }
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("comment", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    Path tablePath = new Path(testDir, "Seekable.data");
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+    int tupleNum = 100000;
+    VTuple vTuple;
+
+    List<Long> offsets = Lists.newArrayList();
+    offsets.add(0L);
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(3);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      vTuple.put(2, DatumFactory.createText("test" + i));
+      appender.addTuple(vTuple);
+
+      // find a seek position
+      if (i % (tupleNum / 3) == 0) {
+        offsets.add(appender.getOffset());
+      }
+    }
+
+    // end of file
+    if (!offsets.contains(appender.getOffset())) {
+      offsets.add(appender.getOffset());
+    }
+
+    appender.close();
+    if (statsable) {
+      TableStats stat = appender.getStats();
+      assertEquals(tupleNum, stat.getNumRows().longValue());
+    }
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(status.getLen(), appender.getOffset());
+
+    Scanner scanner;
+    int tupleCnt = 0;
+    long prevOffset = 0;
+    long readBytes = 0;
+    long readRows = 0;
+    for (long offset : offsets) {
+      scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema,
+	        new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
+      scanner.init();
+
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+
+      scanner.close();
+      if (statsable) {
+        readBytes += scanner.getInputStats().getNumBytes();
+        readRows += scanner.getInputStats().getNumRows();
+      }
+      prevOffset = offset;
+    }
+
+    assertEquals(tupleNum, tupleCnt);
+    if (statsable) {
+      assertEquals(appender.getStats().getNumBytes().longValue(), readBytes);
+      assertEquals(appender.getStats().getNumRows().longValue(), readRows);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
new file mode 100644
index 0000000..7b83894
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.tajo.HttpFileServer;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.NetUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}.
+ */
+public class TestAvroUtil {
+  private Schema expected;
+  private URL schemaUrl;
+
+  @Before
+  public void setUp() throws Exception {
+    schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
+    assertNotNull(schemaUrl);
+
+    File file = new File(schemaUrl.getPath());
+    assertTrue(file.exists());
+
+    expected = new Schema.Parser().parse(file);
+  }
+
+  @Test
+  public void testGetSchema() throws IOException, URISyntaxException {
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+    meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath())));
+    Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+    assertEquals(expected, schema);
+
+    meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+    meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath());
+    schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+    assertEquals(expected, schema);
+
+    HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+    try {
+      server.start();
+      InetSocketAddress addr = server.getBindAddress();
+
+      String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath();
+      meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+      meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url);
+      schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+    } finally {
+      server.stop();
+    }
+    assertEquals(expected, schema);
+  }
+
+  @Test
+  public void testGetSchemaFromHttp() throws IOException, URISyntaxException {
+    HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+    try {
+      server.start();
+      InetSocketAddress addr = server.getBindAddress();
+
+      Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath());
+      assertEquals(expected, schema);
+    } finally {
+      server.stop();
+    }
+  }
+
+  @Test
+  public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException {
+    Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf());
+
+    assertEquals(expected, schema);
+  }
+}