You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:25 UTC

[21/29] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into hbase_storage

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
index 9722959,0000000..95d0407
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@@ -1,223 -1,0 +1,223 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage.text;
 +
 +import com.google.protobuf.Message;
 +import io.netty.buffer.ByteBuf;
 +import io.netty.util.CharsetUtil;
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.tajo.catalog.Column;
 +import org.apache.tajo.common.TajoDataTypes;
 +import org.apache.tajo.conf.TajoConf;
 +import org.apache.tajo.datum.*;
 +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 +import org.apache.tajo.storage.FieldSerializerDeserializer;
 +import org.apache.tajo.util.NumberUtil;
 +
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.nio.charset.CharsetDecoder;
 +
 +//Compatibility with Apache Hive
 +public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
 +  public static final byte[] trueBytes = "true".getBytes();
 +  public static final byte[] falseBytes = "false".getBytes();
-   private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
++  private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
 +  private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
 +
 +  private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
 +    return !val.isReadable() || nullBytes.equals(val);
 +  }
 +
 +  private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
 +    return val.readableBytes() > 0 && nullBytes.equals(val);
 +  }
 +
 +  @Override
 +  public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException {
 +    byte[] bytes;
 +    int length = 0;
 +    TajoDataTypes.DataType dataType = col.getDataType();
 +
 +    if (datum == null || datum instanceof NullDatum) {
 +      switch (dataType.getType()) {
 +        case CHAR:
 +        case TEXT:
 +          length = nullChars.length;
 +          out.write(nullChars);
 +          break;
 +        default:
 +          break;
 +      }
 +      return length;
 +    }
 +
 +    switch (dataType.getType()) {
 +      case BOOLEAN:
 +        out.write(datum.asBool() ? trueBytes : falseBytes);
 +        length = trueBytes.length;
 +        break;
 +      case CHAR:
 +        byte[] pad = new byte[dataType.getLength() - datum.size()];
 +        bytes = datum.asTextBytes();
 +        out.write(bytes);
 +        out.write(pad);
 +        length = bytes.length + pad.length;
 +        break;
 +      case TEXT:
 +      case BIT:
 +      case INT2:
 +      case INT4:
 +      case INT8:
 +      case FLOAT4:
 +      case FLOAT8:
 +      case INET4:
 +      case DATE:
 +      case INTERVAL:
 +        bytes = datum.asTextBytes();
 +        length = bytes.length;
 +        out.write(bytes);
 +        break;
 +      case TIME:
 +        bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes();
 +        length = bytes.length;
 +        out.write(bytes);
 +        break;
 +      case TIMESTAMP:
 +        bytes = ((TimestampDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes();
 +        length = bytes.length;
 +        out.write(bytes);
 +        break;
 +      case INET6:
 +      case BLOB:
 +        bytes = Base64.encodeBase64(datum.asByteArray(), false);
 +        length = bytes.length;
 +        out.write(bytes, 0, length);
 +        break;
 +      case PROTOBUF:
 +        ProtobufDatum protobuf = (ProtobufDatum) datum;
 +        byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
 +        length = protoBytes.length;
 +        out.write(protoBytes, 0, protoBytes.length);
 +        break;
 +      case NULL_TYPE:
 +      default:
 +        break;
 +    }
 +    return length;
 +  }
 +
 +  @Override
 +  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException {
 +    Datum datum;
 +    TajoDataTypes.Type type = col.getDataType().getType();
 +    boolean nullField;
 +    if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
 +      nullField = isNullText(buf, nullChars);
 +    } else {
 +      nullField = isNull(buf, nullChars);
 +    }
 +
 +    if (nullField) {
 +      datum = NullDatum.get();
 +    } else {
 +      switch (type) {
 +        case BOOLEAN:
 +          byte bool = buf.readByte();
 +          datum = DatumFactory.createBool(bool == 't' || bool == 'T');
 +          break;
 +        case BIT:
 +          datum = DatumFactory.createBit(Byte.parseByte(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()));
 +          break;
 +        case CHAR:
 +          datum = DatumFactory.createChar(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim());
 +          break;
 +        case INT1:
 +        case INT2:
 +          datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
 +          break;
 +        case INT4:
 +          datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
 +          break;
 +        case INT8:
 +          datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
 +          break;
 +        case FLOAT4:
 +          datum = DatumFactory.createFloat4(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
 +          break;
 +        case FLOAT8:
 +          datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
 +          break;
 +        case TEXT: {
 +          byte[] bytes = new byte[buf.readableBytes()];
 +          buf.readBytes(bytes);
 +          datum = DatumFactory.createText(bytes);
 +          break;
 +        }
 +        case DATE:
 +          datum = DatumFactory.createDate(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
 +          break;
 +        case TIME:
 +          datum = DatumFactory.createTime(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
 +          break;
 +        case TIMESTAMP:
 +          datum = DatumFactory.createTimestamp(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
 +          break;
 +        case INTERVAL:
 +          datum = DatumFactory.createInterval(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
 +          break;
 +        case PROTOBUF: {
 +          ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
 +          Message.Builder builder = factory.newBuilder();
 +          try {
 +            byte[] bytes = new byte[buf.readableBytes()];
 +            buf.readBytes(bytes);
 +            protobufJsonFormat.merge(bytes, builder);
 +            datum = factory.createDatum(builder.build());
 +          } catch (IOException e) {
 +            e.printStackTrace();
 +            throw new RuntimeException(e);
 +          }
 +          break;
 +        }
 +        case INET4:
 +          datum = DatumFactory.createInet4(
 +              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
 +          break;
 +        case BLOB: {
 +          byte[] bytes = new byte[buf.readableBytes()];
 +          buf.readBytes(bytes);
 +          datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
 +          break;
 +        }
 +        default:
 +          datum = NullDatum.get();
 +          break;
 +      }
 +    }
 +    return datum;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index 0000000,0000000..7ebfa79
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@@ -1,0 -1,0 +1,60 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.storage.Tuple;
++
++import java.io.IOException;
++
++/**
++ * Reads a text line and fills a Tuple with values
++ */
++public abstract class TextLineDeserializer {
++  protected Schema schema;
++  protected TableMeta meta;
++  protected int [] targetColumnIndexes;
++
++  public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
++    this.schema = schema;
++    this.meta = meta;
++    this.targetColumnIndexes = targetColumnIndexes;
++  }
++
++  /**
++   * Initialize SerDe
++   */
++  public abstract void init();
++
++  /**
++   * It fills a tuple with a read fields in a given line.
++   *
++   * @param buf Read line
++   * @param output Tuple to be filled with read fields
++   * @throws java.io.IOException
++   */
++  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError;
++
++  /**
++   * Release external resources
++   */
++  public abstract void release();
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
index 0000000,0000000..f0bae5e
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
@@@ -1,0 -1,0 +1,31 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++public class TextLineParsingError extends Exception {
++
++  public TextLineParsingError(Throwable t) {
++    super(t);
++  }
++
++  public TextLineParsingError(String message, Throwable t) {
++    super(t.getMessage() + ", Error line: " + message);
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
index 0000000,0000000..e81e289
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@@ -1,0 -1,0 +1,65 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.commons.lang.StringEscapeUtils;
++import org.apache.commons.lang.StringUtils;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.storage.BufferPool;
++import org.apache.tajo.storage.StorageConstants;
++
++/**
++ * Pluggable Text Line SerDe class
++ */
++public abstract class TextLineSerDe {
++
++  public TextLineSerDe() {
++  }
++
++  public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes);
++
++  public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
++
++  public static ByteBuf getNullChars(TableMeta meta) {
++    byte[] nullCharByteArray = getNullCharsAsBytes(meta);
++
++    ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length);
++    nullChars.writeBytes(nullCharByteArray);
++
++    return nullChars;
++  }
++
++  public static byte [] getNullCharsAsBytes(TableMeta meta) {
++    byte [] nullChars;
++
++    String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
++        NullDatum.DEFAULT_TEXT));
++    if (StringUtils.isEmpty(nullCharacters)) {
++      nullChars = NullDatum.get().asTextBytes();
++    } else {
++      nullChars = nullCharacters.getBytes();
++    }
++
++    return nullChars;
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
index 0000000,0000000..0c2761f
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
@@@ -1,0 -1,0 +1,45 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.storage.Tuple;
++
++import java.io.IOException;
++import java.io.OutputStream;
++
++/**
++ * Write a Tuple into single text formatted line
++ */
++public abstract class TextLineSerializer {
++  protected Schema schema;
++  protected TableMeta meta;
++
++  public TextLineSerializer(Schema schema, TableMeta meta) {
++    this.schema = schema;
++    this.meta = meta;
++  }
++
++  public abstract void init();
++
++  public abstract int serialize(OutputStream out, Tuple input) throws IOException;
++
++  public abstract void release();
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index 088fda9,0000000..ff7fe13
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@@ -1,129 -1,0 +1,137 @@@
- /**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
- 
- package org.apache.tajo.storage;
- 
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.s3.S3FileSystem;
- import org.apache.hadoop.hdfs.DFSConfigKeys;
- import org.apache.tajo.catalog.CatalogUtil;
- import org.apache.tajo.catalog.Schema;
- import org.apache.tajo.catalog.TableMeta;
- import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
- import org.apache.tajo.common.TajoDataTypes.Type;
- import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.datum.Datum;
- import org.apache.tajo.datum.DatumFactory;
- import org.apache.tajo.storage.fragment.Fragment;
- import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
- import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.junit.runners.Parameterized;
- 
- import java.io.IOException;
- import java.net.URI;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.List;
- 
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertTrue;
- 
- @RunWith(Parameterized.class)
- public class TestFileSystems {
- 
-   protected byte[] data = null;
- 
-   private static String TEST_PATH = "target/test-data/TestFileSystem";
-   private TajoConf conf = null;
-   private FileStorageManager sm = null;
-   private FileSystem fs = null;
-   Path testDir;
- 
-   public TestFileSystems(FileSystem fs) throws IOException {
-     conf = new TajoConf();
- 
-     if(fs instanceof S3FileSystem){
-       conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
-       fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
-     }
-     this.fs = fs;
-     sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
-     testDir = getTestDir(this.fs, TEST_PATH);
-   }
- 
-   public Path getTestDir(FileSystem fs, String dir) throws IOException {
-     Path path = new Path(dir);
-     if(fs.exists(path))
-       fs.delete(path, true);
- 
-     fs.mkdirs(path);
- 
-     return fs.makeQualified(path);
-   }
- 
-   @Parameterized.Parameters
-   public static Collection<Object[]> generateParameters() {
-     return Arrays.asList(new Object[][] {
-         {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
-     });
-   }
- 
-   @Test
-   public void testBlockSplit() throws IOException {
- 
-     Schema schema = new Schema();
-     schema.addColumn("id", Type.INT4);
-     schema.addColumn("age", Type.INT4);
-     schema.addColumn("name", Type.TEXT);
- 
-     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
- 
-     Tuple[] tuples = new Tuple[4];
-     for (int i = 0; i < tuples.length; i++) {
-       tuples[i] = new VTuple(3);
-       tuples[i]
-           .put(new Datum[] { DatumFactory.createInt4(i),
-               DatumFactory.createInt4(i + 32),
-               DatumFactory.createText("name" + i) });
-     }
- 
-     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
-         "table.csv");
-     fs.mkdirs(path.getParent());
- 
-     Appender appender = sm.getAppender(meta, schema, path);
-     appender.init();
-     for (Tuple t : tuples) {
-       appender.addTuple(t);
-     }
-     appender.close();
-     FileStatus fileStatus = fs.getFileStatus(path);
- 
-     List<Fragment> splits = sm.getSplits("table", meta, schema, path);
-     int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
-     assertEquals(splitSize, splits.size());
- 
-     for (Fragment fragment : splits) {
-       assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
-     }
-   }
- }
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage;
++
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.LocalFileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.tajo.catalog.CatalogUtil;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.datum.Datum;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.storage.fragment.Fragment;
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++
++import java.io.IOException;
++import java.net.URI;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.List;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++@RunWith(Parameterized.class)
++public class TestFileSystems {
++
++  private static String TEST_PATH = "target/test-data/TestFileSystem";
++  private TajoConf conf;
++  private FileStorageManager sm;
++  private FileSystem fs;
++  private Path testDir;
++
++  public TestFileSystems(FileSystem fs) throws IOException {
++    this.fs = fs;
++    this.conf = new TajoConf(fs.getConf());
++    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
++    testDir = getTestDir(this.fs, TEST_PATH);
++  }
++
++  public Path getTestDir(FileSystem fs, String dir) throws IOException {
++    Path path = new Path(dir);
++    if(fs.exists(path))
++      fs.delete(path, true);
++
++    fs.mkdirs(path);
++
++    return fs.makeQualified(path);
++  }
++
++  @Parameterized.Parameters
++  public static Collection<Object[]> generateParameters() throws IOException {
++    return Arrays.asList(new Object[][]{
++        {FileSystem.getLocal(new TajoConf())},
++    });
++  }
++
++  @Before
++  public void setup() throws IOException {
++    if (!(fs instanceof LocalFileSystem)) {
++      conf.set("fs.local.block.size", "10");
++      fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
++      fs.setConf(conf);
++    }
++  }
++
++  @After
++  public void tearDown() throws IOException {
++    if (!(fs instanceof LocalFileSystem)) {
++      fs.setConf(new TajoConf());
++    }
++  }
++
++  @Test
++  public void testBlockSplit() throws IOException {
++
++    Schema schema = new Schema();
++    schema.addColumn("id", Type.INT4);
++    schema.addColumn("age", Type.INT4);
++    schema.addColumn("name", Type.TEXT);
++
++    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
++
++    Tuple[] tuples = new Tuple[4];
++    for (int i = 0; i < tuples.length; i++) {
++      tuples[i] = new VTuple(3);
++      tuples[i]
++          .put(new Datum[]{DatumFactory.createInt4(i),
++              DatumFactory.createInt4(i + 32),
++              DatumFactory.createText("name" + i)});
++    }
++
++    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
++        "table.csv");
++    fs.mkdirs(path.getParent());
++
++    Appender appender = sm.getAppender(meta, schema, path);
++    appender.init();
++    for (Tuple t : tuples) {
++      appender.addTuple(t);
++    }
++    appender.close();
++    FileStatus fileStatus = fs.getFileStatus(path);
++
++    List<Fragment> splits = sm.getSplits("table", meta, schema, path);
++    int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
++    assertEquals(splitSize, splits.size());
++
++    for (Fragment fragment : splits) {
++      assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 4081a80,0000000..15998f2
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@@ -1,867 -1,0 +1,878 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage;
 +
 +import com.google.common.collect.Lists;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.BytesWritable;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.tajo.QueryId;
 +import org.apache.tajo.TajoIdProtos;
 +import org.apache.tajo.catalog.CatalogUtil;
 +import org.apache.tajo.catalog.Schema;
 +import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 +import org.apache.tajo.catalog.statistics.TableStats;
 +import org.apache.tajo.common.TajoDataTypes.Type;
 +import org.apache.tajo.conf.TajoConf;
 +import org.apache.tajo.datum.Datum;
 +import org.apache.tajo.datum.DatumFactory;
 +import org.apache.tajo.datum.NullDatum;
 +import org.apache.tajo.datum.ProtobufDatumFactory;
 +import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.rcfile.RCFile;
 +import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
 +import org.apache.tajo.util.CommonTestingUtil;
 +import org.apache.tajo.util.FileUtil;
 +import org.apache.tajo.util.KeyValueSet;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +@RunWith(Parameterized.class)
 +public class TestStorages {
 +	private TajoConf conf;
 +	private static String TEST_PATH = "target/test-data/TestStorages";
 +
 +  private static String TEST_PROJECTION_AVRO_SCHEMA =
 +      "{\n" +
 +      "  \"type\": \"record\",\n" +
 +      "  \"namespace\": \"org.apache.tajo\",\n" +
 +      "  \"name\": \"testProjection\",\n" +
 +      "  \"fields\": [\n" +
 +      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
 +      "    { \"name\": \"age\", \"type\": \"long\" },\n" +
 +      "    { \"name\": \"score\", \"type\": \"float\" }\n" +
 +      "  ]\n" +
 +      "}\n";
 +
 +  private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
 +      "{\n" +
 +      "  \"type\": \"record\",\n" +
 +      "  \"namespace\": \"org.apache.tajo\",\n" +
 +      "  \"name\": \"testNullHandlingTypes\",\n" +
 +      "  \"fields\": [\n" +
 +      "    { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
-       "    { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" +
-       "    { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" +
++      "    { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" +
++      "    { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" +
 +      "    { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
-       "    { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" +
-       "    { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" +
-       "    { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" +
-       "    { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" +
-       "    { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" +
++      "    { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" +
++      "    { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" +
++      "    { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" +
++      "    { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" +
++      "    { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" +
 +      "    { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
-       "    { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" +
-       "    { \"name\": \"col12\", \"type\": \"null\" },\n" +
-       "    { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" +
++      "    { \"name\": \"col11\", \"type\": \"null\" },\n" +
++      "    { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" +
 +      "  ]\n" +
 +      "}\n";
 +
 +  private StoreType storeType;
 +  private boolean splitable;
 +  private boolean statsable;
 +  private boolean seekable;
 +  private Path testDir;
 +  private FileSystem fs;
 +
 +  public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException {
 +    this.storeType = type;
 +    this.splitable = splitable;
 +    this.statsable = statsable;
 +    this.seekable = seekable;
 +
 +    conf = new TajoConf();
 +
 +    if (storeType == StoreType.RCFILE) {
 +      conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
 +    }
 +
 +    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
 +    fs = testDir.getFileSystem(conf);
 +  }
 +
 +  @Parameterized.Parameters
 +  public static Collection<Object[]> generateParameters() {
 +    return Arrays.asList(new Object[][] {
 +        //type, splitable, statsable, seekable
 +        {StoreType.CSV, true, true, true},
 +        {StoreType.RAW, false, true, true},
 +        {StoreType.RCFILE, true, true, false},
 +        {StoreType.PARQUET, false, false, false},
 +        {StoreType.SEQUENCEFILE, true, true, false},
 +        {StoreType.AVRO, false, false, false},
 +        {StoreType.TEXTFILE, true, true, false},
++        {StoreType.JSON, true, true, false},
 +    });
 +  }
 +
 +	@Test
 +  public void testSplitable() throws IOException {
 +    if (splitable) {
 +      Schema schema = new Schema();
 +      schema.addColumn("id", Type.INT4);
 +      schema.addColumn("age", Type.INT8);
 +
 +      TableMeta meta = CatalogUtil.newTableMeta(storeType);
 +      Path tablePath = new Path(testDir, "Splitable.data");
 +      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +      Appender appender = sm.getAppender(meta, schema, tablePath);
 +      appender.enableStats();
 +      appender.init();
 +      int tupleNum = 10000;
 +      VTuple vTuple;
 +
 +      for (int i = 0; i < tupleNum; i++) {
 +        vTuple = new VTuple(2);
 +        vTuple.put(0, DatumFactory.createInt4(i + 1));
 +        vTuple.put(1, DatumFactory.createInt8(25l));
 +        appender.addTuple(vTuple);
 +      }
 +      appender.close();
 +      TableStats stat = appender.getStats();
 +      assertEquals(tupleNum, stat.getNumRows().longValue());
 +
 +      FileStatus status = fs.getFileStatus(tablePath);
 +      long fileLen = status.getLen();
 +      long randomNum = (long) (Math.random() * fileLen) + 1;
 +
 +      FileFragment[] tablets = new FileFragment[2];
 +      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
 +      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
 +
 +      Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
 +      assertTrue(scanner.isSplittable());
 +      scanner.init();
 +      int tupleCnt = 0;
 +      while (scanner.next() != null) {
 +        tupleCnt++;
 +      }
 +      scanner.close();
 +
 +      scanner = sm.getScanner(meta, schema, tablets[1], schema);
 +      assertTrue(scanner.isSplittable());
 +      scanner.init();
 +      while (scanner.next() != null) {
 +        tupleCnt++;
 +      }
 +      scanner.close();
 +
 +      assertEquals(tupleNum, tupleCnt);
 +    }
 +	}
 +
 +  @Test
 +  public void testRCFileSplitable() throws IOException {
 +    if (storeType == StoreType.RCFILE) {
 +      Schema schema = new Schema();
 +      schema.addColumn("id", Type.INT4);
 +      schema.addColumn("age", Type.INT8);
 +
 +      TableMeta meta = CatalogUtil.newTableMeta(storeType);
 +      Path tablePath = new Path(testDir, "Splitable.data");
 +      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +      Appender appender = sm.getAppender(meta, schema, tablePath);
 +      appender.enableStats();
 +      appender.init();
 +      int tupleNum = 10000;
 +      VTuple vTuple;
 +
 +      for (int i = 0; i < tupleNum; i++) {
 +        vTuple = new VTuple(2);
 +        vTuple.put(0, DatumFactory.createInt4(i + 1));
 +        vTuple.put(1, DatumFactory.createInt8(25l));
 +        appender.addTuple(vTuple);
 +      }
 +      appender.close();
 +      TableStats stat = appender.getStats();
 +      assertEquals(tupleNum, stat.getNumRows().longValue());
 +
 +      FileStatus status = fs.getFileStatus(tablePath);
 +      long fileLen = status.getLen();
 +      long randomNum = 122; // header size
 +
 +      FileFragment[] tablets = new FileFragment[2];
 +      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
 +      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
 +
 +      Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
 +      assertTrue(scanner.isSplittable());
 +      scanner.init();
 +      int tupleCnt = 0;
 +      while (scanner.next() != null) {
 +        tupleCnt++;
 +      }
 +      scanner.close();
 +
 +      scanner = sm.getScanner(meta, schema, tablets[1], schema);
 +      assertTrue(scanner.isSplittable());
 +      scanner.init();
 +      while (scanner.next() != null) {
 +        tupleCnt++;
 +      }
 +      scanner.close();
 +
 +      assertEquals(tupleNum, tupleCnt);
 +    }
 +  }
 +
 +  @Test
 +  public void testProjection() throws IOException {
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.INT4);
 +    schema.addColumn("age", Type.INT8);
 +    schema.addColumn("score", Type.FLOAT4);
 +
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType);
 +    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 +    if (storeType == StoreType.AVRO) {
 +      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
 +                     TEST_PROJECTION_AVRO_SCHEMA);
 +    }
 +
 +    Path tablePath = new Path(testDir, "testProjection.data");
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    Appender appender = sm.getAppender(meta, schema, tablePath);
 +    appender.init();
 +    int tupleNum = 10000;
 +    VTuple vTuple;
 +
 +    for (int i = 0; i < tupleNum; i++) {
 +      vTuple = new VTuple(3);
 +      vTuple.put(0, DatumFactory.createInt4(i + 1));
 +      vTuple.put(1, DatumFactory.createInt8(i + 2));
 +      vTuple.put(2, DatumFactory.createFloat4(i + 3));
 +      appender.addTuple(vTuple);
 +    }
 +    appender.close();
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
 +
 +    Schema target = new Schema();
 +    target.addColumn("age", Type.INT8);
 +    target.addColumn("score", Type.FLOAT4);
 +    Scanner scanner = sm.getScanner(meta, schema, fragment, target);
 +    scanner.init();
 +    int tupleCnt = 0;
 +    Tuple tuple;
 +    while ((tuple = scanner.next()) != null) {
 +      if (storeType == StoreType.RCFILE
 +          || storeType == StoreType.CSV
 +          || storeType == StoreType.PARQUET
 +          || storeType == StoreType.SEQUENCEFILE
 +          || storeType == StoreType.AVRO) {
 +        assertTrue(tuple.get(0) == null);
 +      }
 +      assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
 +      assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
 +      tupleCnt++;
 +    }
 +    scanner.close();
 +
 +    assertEquals(tupleNum, tupleCnt);
 +  }
 +
 +  @Test
 +  public void testVariousTypes() throws IOException {
++    boolean handleProtobuf = storeType != StoreType.JSON;
++
 +    Schema schema = new Schema();
 +    schema.addColumn("col1", Type.BOOLEAN);
-     schema.addColumn("col2", Type.BIT);
-     schema.addColumn("col3", Type.CHAR, 7);
-     schema.addColumn("col4", Type.INT2);
-     schema.addColumn("col5", Type.INT4);
-     schema.addColumn("col6", Type.INT8);
-     schema.addColumn("col7", Type.FLOAT4);
-     schema.addColumn("col8", Type.FLOAT8);
-     schema.addColumn("col9", Type.TEXT);
-     schema.addColumn("col10", Type.BLOB);
-     schema.addColumn("col11", Type.INET4);
-     schema.addColumn("col12", Type.NULL_TYPE);
-     schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++    schema.addColumn("col2", Type.CHAR, 7);
++    schema.addColumn("col3", Type.INT2);
++    schema.addColumn("col4", Type.INT4);
++    schema.addColumn("col5", Type.INT8);
++    schema.addColumn("col6", Type.FLOAT4);
++    schema.addColumn("col7", Type.FLOAT8);
++    schema.addColumn("col8", Type.TEXT);
++    schema.addColumn("col9", Type.BLOB);
++    schema.addColumn("col10", Type.INET4);
++    schema.addColumn("col11", Type.NULL_TYPE);
++    if (handleProtobuf) {
++      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++    }
 +
 +    KeyValueSet options = new KeyValueSet();
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 +    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 +    if (storeType == StoreType.AVRO) {
-       String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString();
++      String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
 +      meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
 +    }
 +
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    Path tablePath = new Path(testDir, "testVariousTypes.data");
 +    Appender appender = sm.getAppender(meta, schema, tablePath);
 +    appender.init();
 +
 +    QueryId queryid = new QueryId("12345", 5);
 +    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 +
-     Tuple tuple = new VTuple(13);
++    Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0));
 +    tuple.put(new Datum[] {
 +        DatumFactory.createBool(true),
-         DatumFactory.createBit((byte) 0x99),
 +        DatumFactory.createChar("hyunsik"),
 +        DatumFactory.createInt2((short) 17),
 +        DatumFactory.createInt4(59),
 +        DatumFactory.createInt8(23l),
 +        DatumFactory.createFloat4(77.9f),
 +        DatumFactory.createFloat8(271.9f),
 +        DatumFactory.createText("hyunsik"),
 +        DatumFactory.createBlob("hyunsik".getBytes()),
 +        DatumFactory.createInet4("192.168.0.1"),
-         NullDatum.get(),
-         factory.createDatum(queryid.getProto())
++        NullDatum.get()
 +    });
++    if (handleProtobuf) {
++      tuple.put(11, factory.createDatum(queryid.getProto()));
++    }
++
 +    appender.addTuple(tuple);
 +    appender.flush();
 +    appender.close();
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
 +    Scanner scanner =  sm.getScanner(meta, schema, fragment);
 +    scanner.init();
 +
 +    Tuple retrieved;
 +    while ((retrieved = scanner.next()) != null) {
 +      for (int i = 0; i < tuple.size(); i++) {
 +        assertEquals(tuple.get(i), retrieved.get(i));
 +      }
 +    }
 +    scanner.close();
 +  }
 +
 +  @Test
 +  public void testNullHandlingTypes() throws IOException {
++    boolean handleProtobuf = storeType != StoreType.JSON;
++
 +    Schema schema = new Schema();
 +    schema.addColumn("col1", Type.BOOLEAN);
-     schema.addColumn("col2", Type.BIT);
-     schema.addColumn("col3", Type.CHAR, 7);
-     schema.addColumn("col4", Type.INT2);
-     schema.addColumn("col5", Type.INT4);
-     schema.addColumn("col6", Type.INT8);
-     schema.addColumn("col7", Type.FLOAT4);
-     schema.addColumn("col8", Type.FLOAT8);
-     schema.addColumn("col9", Type.TEXT);
-     schema.addColumn("col10", Type.BLOB);
-     schema.addColumn("col11", Type.INET4);
-     schema.addColumn("col12", Type.NULL_TYPE);
-     schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++    schema.addColumn("col2", Type.CHAR, 7);
++    schema.addColumn("col3", Type.INT2);
++    schema.addColumn("col4", Type.INT4);
++    schema.addColumn("col5", Type.INT8);
++    schema.addColumn("col6", Type.FLOAT4);
++    schema.addColumn("col7", Type.FLOAT8);
++    schema.addColumn("col8", Type.TEXT);
++    schema.addColumn("col9", Type.BLOB);
++    schema.addColumn("col10", Type.INET4);
++    schema.addColumn("col11", Type.NULL_TYPE);
++
++    if (handleProtobuf) {
++      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++    }
 +
 +    KeyValueSet options = new KeyValueSet();
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 +    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 +    meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
 +    meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
 +    meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
 +    meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
 +    if (storeType == StoreType.AVRO) {
 +      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
 +                     TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
 +    }
 +
 +    Path tablePath = new Path(testDir, "testVariousTypes.data");
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    Appender appender = sm.getAppender(meta, schema, tablePath);
 +    appender.init();
 +
 +    QueryId queryid = new QueryId("12345", 5);
 +    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
- 
-     Tuple seedTuple = new VTuple(13);
++    int columnNum = 11 + (handleProtobuf ? 1 : 0);
++    Tuple seedTuple = new VTuple(columnNum);
 +    seedTuple.put(new Datum[]{
 +        DatumFactory.createBool(true),                // 0
-         DatumFactory.createBit((byte) 0x99),          // 1
 +        DatumFactory.createChar("hyunsik"),           // 2
 +        DatumFactory.createInt2((short) 17),          // 3
 +        DatumFactory.createInt4(59),                  // 4
 +        DatumFactory.createInt8(23l),                 // 5
 +        DatumFactory.createFloat4(77.9f),             // 6
 +        DatumFactory.createFloat8(271.9f),            // 7
 +        DatumFactory.createText("hyunsik"),           // 8
 +        DatumFactory.createBlob("hyunsik".getBytes()),// 9
 +        DatumFactory.createInet4("192.168.0.1"),      // 10
 +        NullDatum.get(),                              // 11
-         factory.createDatum(queryid.getProto())       // 12
 +    });
 +
++    if (handleProtobuf) {
++      seedTuple.put(11, factory.createDatum(queryid.getProto()));       // 12
++    }
++
 +    // Making tuples with different null column positions
 +    Tuple tuple;
-     for (int i = 0; i < 13; i++) {
-       tuple = new VTuple(13);
-       for (int j = 0; j < 13; j++) {
++    for (int i = 0; i < columnNum; i++) {
++      tuple = new VTuple(columnNum);
++      for (int j = 0; j < columnNum; j++) {
 +        if (i == j) { // i'th column will have NULL value
 +          tuple.put(j, NullDatum.get());
 +        } else {
 +          tuple.put(j, seedTuple.get(j));
 +        }
 +      }
 +      appender.addTuple(tuple);
 +    }
 +    appender.flush();
 +    appender.close();
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
 +    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
 +    scanner.init();
 +
 +    Tuple retrieved;
 +    int i = 0;
 +    while ((retrieved = scanner.next()) != null) {
-       assertEquals(13, retrieved.size());
-       for (int j = 0; j < 13; j++) {
++      assertEquals(columnNum, retrieved.size());
++      for (int j = 0; j < columnNum; j++) {
 +        if (i == j) {
 +          assertEquals(NullDatum.get(), retrieved.get(j));
 +        } else {
 +          assertEquals(seedTuple.get(j), retrieved.get(j));
 +        }
 +      }
 +
 +      i++;
 +    }
 +    scanner.close();
 +  }
 +
 +  @Test
 +  public void testRCFileTextSerializeDeserialize() throws IOException {
 +    if(storeType != StoreType.RCFILE) return;
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("col1", Type.BOOLEAN);
 +    schema.addColumn("col2", Type.BIT);
 +    schema.addColumn("col3", Type.CHAR, 7);
 +    schema.addColumn("col4", Type.INT2);
 +    schema.addColumn("col5", Type.INT4);
 +    schema.addColumn("col6", Type.INT8);
 +    schema.addColumn("col7", Type.FLOAT4);
 +    schema.addColumn("col8", Type.FLOAT8);
 +    schema.addColumn("col9", Type.TEXT);
 +    schema.addColumn("col10", Type.BLOB);
 +    schema.addColumn("col11", Type.INET4);
 +    schema.addColumn("col12", Type.NULL_TYPE);
 +    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
 +
 +    KeyValueSet options = new KeyValueSet();
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 +    meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
 +
 +    Path tablePath = new Path(testDir, "testVariousTypes.data");
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    Appender appender = sm.getAppender(meta, schema, tablePath);
 +    appender.enableStats();
 +    appender.init();
 +
 +    QueryId queryid = new QueryId("12345", 5);
 +    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 +
 +    Tuple tuple = new VTuple(13);
 +    tuple.put(new Datum[] {
 +        DatumFactory.createBool(true),
 +        DatumFactory.createBit((byte) 0x99),
 +        DatumFactory.createChar("jinho"),
 +        DatumFactory.createInt2((short) 17),
 +        DatumFactory.createInt4(59),
 +        DatumFactory.createInt8(23l),
 +        DatumFactory.createFloat4(77.9f),
 +        DatumFactory.createFloat8(271.9f),
 +        DatumFactory.createText("jinho"),
 +        DatumFactory.createBlob("hyunsik babo".getBytes()),
 +        DatumFactory.createInet4("192.168.0.1"),
 +        NullDatum.get(),
 +        factory.createDatum(queryid.getProto())
 +    });
 +    appender.addTuple(tuple);
 +    appender.flush();
 +    appender.close();
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 +
 +    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
 +    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
 +    scanner.init();
 +
 +    Tuple retrieved;
 +    while ((retrieved=scanner.next()) != null) {
 +      for (int i = 0; i < tuple.size(); i++) {
 +        assertEquals(tuple.get(i), retrieved.get(i));
 +      }
 +    }
 +    scanner.close();
 +    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
 +    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
 +  }
 +
 +  @Test
 +  public void testRCFileBinarySerializeDeserialize() throws IOException {
 +    if(storeType != StoreType.RCFILE) return;
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("col1", Type.BOOLEAN);
 +    schema.addColumn("col2", Type.BIT);
 +    schema.addColumn("col3", Type.CHAR, 7);
 +    schema.addColumn("col4", Type.INT2);
 +    schema.addColumn("col5", Type.INT4);
 +    schema.addColumn("col6", Type.INT8);
 +    schema.addColumn("col7", Type.FLOAT4);
 +    schema.addColumn("col8", Type.FLOAT8);
 +    schema.addColumn("col9", Type.TEXT);
 +    schema.addColumn("col10", Type.BLOB);
 +    schema.addColumn("col11", Type.INET4);
 +    schema.addColumn("col12", Type.NULL_TYPE);
 +    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
 +
 +    KeyValueSet options = new KeyValueSet();
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 +    meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
 +
 +    Path tablePath = new Path(testDir, "testVariousTypes.data");
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    Appender appender = sm.getAppender(meta, schema, tablePath);
 +    appender.enableStats();
 +    appender.init();
 +
 +    QueryId queryid = new QueryId("12345", 5);
 +    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 +
 +    Tuple tuple = new VTuple(13);
 +    tuple.put(new Datum[] {
 +        DatumFactory.createBool(true),
 +        DatumFactory.createBit((byte) 0x99),
 +        DatumFactory.createChar("jinho"),
 +        DatumFactory.createInt2((short) 17),
 +        DatumFactory.createInt4(59),
 +        DatumFactory.createInt8(23l),
 +        DatumFactory.createFloat4(77.9f),
 +        DatumFactory.createFloat8(271.9f),
 +        DatumFactory.createText("jinho"),
 +        DatumFactory.createBlob("hyunsik babo".getBytes()),
 +        DatumFactory.createInet4("192.168.0.1"),
 +        NullDatum.get(),
 +        factory.createDatum(queryid.getProto())
 +    });
 +    appender.addTuple(tuple);
 +    appender.flush();
 +    appender.close();
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 +
 +    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
 +    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
 +    scanner.init();
 +
 +    Tuple retrieved;
 +    while ((retrieved=scanner.next()) != null) {
 +      for (int i = 0; i < tuple.size(); i++) {
 +        assertEquals(tuple.get(i), retrieved.get(i));
 +      }
 +    }
 +    scanner.close();
 +    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
 +    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
 +  }
 +
 +  @Test
 +  public void testSequenceFileTextSerializeDeserialize() throws IOException {
 +    if(storeType != StoreType.SEQUENCEFILE) return;
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("col1", Type.BOOLEAN);
 +    schema.addColumn("col2", Type.BIT);
 +    schema.addColumn("col3", Type.CHAR, 7);
 +    schema.addColumn("col4", Type.INT2);
 +    schema.addColumn("col5", Type.INT4);
 +    schema.addColumn("col6", Type.INT8);
 +    schema.addColumn("col7", Type.FLOAT4);
 +    schema.addColumn("col8", Type.FLOAT8);
 +    schema.addColumn("col9", Type.TEXT);
 +    schema.addColumn("col10", Type.BLOB);
 +    schema.addColumn("col11", Type.INET4);
 +    schema.addColumn("col12", Type.NULL_TYPE);
 +    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
 +
 +    KeyValueSet options = new KeyValueSet();
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 +    meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
 +
 +    Path tablePath = new Path(testDir, "testVariousTypes.data");
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    Appender appender = sm.getAppender(meta, schema, tablePath);
 +    appender.enableStats();
 +    appender.init();
 +
 +    QueryId queryid = new QueryId("12345", 5);
 +    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 +
 +    Tuple tuple = new VTuple(13);
 +    tuple.put(new Datum[] {
 +        DatumFactory.createBool(true),
 +        DatumFactory.createBit((byte) 0x99),
 +        DatumFactory.createChar("jinho"),
 +        DatumFactory.createInt2((short) 17),
 +        DatumFactory.createInt4(59),
 +        DatumFactory.createInt8(23l),
 +        DatumFactory.createFloat4(77.9f),
 +        DatumFactory.createFloat8(271.9f),
 +        DatumFactory.createText("jinho"),
 +        DatumFactory.createBlob("hyunsik babo".getBytes()),
 +        DatumFactory.createInet4("192.168.0.1"),
 +        NullDatum.get(),
 +        factory.createDatum(queryid.getProto())
 +    });
 +    appender.addTuple(tuple);
 +    appender.flush();
 +    appender.close();
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 +
 +    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
 +    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
 +    scanner.init();
 +
 +    assertTrue(scanner instanceof SequenceFileScanner);
 +    Writable key = ((SequenceFileScanner) scanner).getKey();
 +    assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
 +
 +    Tuple retrieved;
 +    while ((retrieved=scanner.next()) != null) {
 +      for (int i = 0; i < tuple.size(); i++) {
 +        assertEquals(tuple.get(i), retrieved.get(i));
 +      }
 +    }
 +    scanner.close();
 +    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
 +    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
 +  }
 +
 +  @Test
 +  public void testSequenceFileBinarySerializeDeserialize() throws IOException {
 +    if(storeType != StoreType.SEQUENCEFILE) return;
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("col1", Type.BOOLEAN);
 +    schema.addColumn("col2", Type.BIT);
 +    schema.addColumn("col3", Type.CHAR, 7);
 +    schema.addColumn("col4", Type.INT2);
 +    schema.addColumn("col5", Type.INT4);
 +    schema.addColumn("col6", Type.INT8);
 +    schema.addColumn("col7", Type.FLOAT4);
 +    schema.addColumn("col8", Type.FLOAT8);
 +    schema.addColumn("col9", Type.TEXT);
 +    schema.addColumn("col10", Type.BLOB);
 +    schema.addColumn("col11", Type.INET4);
 +    schema.addColumn("col12", Type.NULL_TYPE);
 +    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
 +
 +    KeyValueSet options = new KeyValueSet();
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 +    meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
 +
 +    Path tablePath = new Path(testDir, "testVariousTypes.data");
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    Appender appender = sm.getAppender(meta, schema, tablePath);
 +    appender.enableStats();
 +    appender.init();
 +
 +    QueryId queryid = new QueryId("12345", 5);
 +    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 +
 +    Tuple tuple = new VTuple(13);
 +    tuple.put(new Datum[] {
 +        DatumFactory.createBool(true),
 +        DatumFactory.createBit((byte) 0x99),
 +        DatumFactory.createChar("jinho"),
 +        DatumFactory.createInt2((short) 17),
 +        DatumFactory.createInt4(59),
 +        DatumFactory.createInt8(23l),
 +        DatumFactory.createFloat4(77.9f),
 +        DatumFactory.createFloat8(271.9f),
 +        DatumFactory.createText("jinho"),
 +        DatumFactory.createBlob("hyunsik babo".getBytes()),
 +        DatumFactory.createInet4("192.168.0.1"),
 +        NullDatum.get(),
 +        factory.createDatum(queryid.getProto())
 +    });
 +    appender.addTuple(tuple);
 +    appender.flush();
 +    appender.close();
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 +
 +    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
 +    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
 +    scanner.init();
 +
 +    assertTrue(scanner instanceof SequenceFileScanner);
 +    Writable key = ((SequenceFileScanner) scanner).getKey();
 +    assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
 +
 +    Tuple retrieved;
 +    while ((retrieved=scanner.next()) != null) {
 +      for (int i = 0; i < tuple.size(); i++) {
 +        assertEquals(tuple.get(i), retrieved.get(i));
 +      }
 +    }
 +    scanner.close();
 +    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
 +    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
 +  }
 +
 +  @Test
 +  public void testTime() throws IOException {
 +    if (storeType == StoreType.CSV || storeType == StoreType.RAW) {
 +      Schema schema = new Schema();
 +      schema.addColumn("col1", Type.DATE);
 +      schema.addColumn("col2", Type.TIME);
 +      schema.addColumn("col3", Type.TIMESTAMP);
 +
 +      KeyValueSet options = new KeyValueSet();
 +      TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 +
 +      Path tablePath = new Path(testDir, "testTime.data");
 +      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +      Appender appender = sm.getAppender(meta, schema, tablePath);
 +      appender.init();
 +
 +      Tuple tuple = new VTuple(3);
 +      tuple.put(new Datum[]{
 +          DatumFactory.createDate("1980-04-01"),
 +          DatumFactory.createTime("12:34:56"),
 +          DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000))
 +      });
 +      appender.addTuple(tuple);
 +      appender.flush();
 +      appender.close();
 +
 +      FileStatus status = fs.getFileStatus(tablePath);
 +      FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
 +      Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
 +      scanner.init();
 +
 +      Tuple retrieved;
 +      while ((retrieved = scanner.next()) != null) {
 +        for (int i = 0; i < tuple.size(); i++) {
 +          assertEquals(tuple.get(i), retrieved.get(i));
 +        }
 +      }
 +      scanner.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testSeekableScanner() throws IOException {
 +    if (!seekable) {
 +      return;
 +    }
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.INT4);
 +    schema.addColumn("age", Type.INT8);
 +    schema.addColumn("comment", Type.TEXT);
 +
 +    TableMeta meta = CatalogUtil.newTableMeta(storeType);
 +    Path tablePath = new Path(testDir, "Seekable.data");
 +    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 +    FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
 +    appender.enableStats();
 +    appender.init();
 +    int tupleNum = 100000;
 +    VTuple vTuple;
 +
 +    List<Long> offsets = Lists.newArrayList();
 +    offsets.add(0L);
 +    for (int i = 0; i < tupleNum; i++) {
 +      vTuple = new VTuple(3);
 +      vTuple.put(0, DatumFactory.createInt4(i + 1));
 +      vTuple.put(1, DatumFactory.createInt8(25l));
 +      vTuple.put(2, DatumFactory.createText("test" + i));
 +      appender.addTuple(vTuple);
 +
 +      // find a seek position
 +      if (i % (tupleNum / 3) == 0) {
 +        offsets.add(appender.getOffset());
 +      }
 +    }
 +
 +    // end of file
 +    if (!offsets.contains(appender.getOffset())) {
 +      offsets.add(appender.getOffset());
 +    }
 +
 +    appender.close();
 +    if (statsable) {
 +      TableStats stat = appender.getStats();
 +      assertEquals(tupleNum, stat.getNumRows().longValue());
 +    }
 +
 +    FileStatus status = fs.getFileStatus(tablePath);
 +    assertEquals(status.getLen(), appender.getOffset());
 +
 +    Scanner scanner;
 +    int tupleCnt = 0;
 +    long prevOffset = 0;
 +    long readBytes = 0;
 +    long readRows = 0;
 +    for (long offset : offsets) {
 +      scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema,
 +	        new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
 +      scanner.init();
 +
 +      while (scanner.next() != null) {
 +        tupleCnt++;
 +      }
 +
 +      scanner.close();
 +      if (statsable) {
 +        readBytes += scanner.getInputStats().getNumBytes();
 +        readRows += scanner.getInputStats().getNumRows();
 +      }
 +      prevOffset = offset;
 +    }
 +
 +    assertEquals(tupleNum, tupleCnt);
 +    if (statsable) {
 +      assertEquals(appender.getStats().getNumBytes().longValue(), readBytes);
 +      assertEquals(appender.getStats().getNumRows().longValue(), readRows);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
index 4f7ea1c,0000000..7b83894
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@@ -1,106 -1,0 +1,106 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage.avro;
 +
 +import org.apache.avro.Schema;
 +import org.apache.tajo.HttpFileServer;
 +import org.apache.tajo.catalog.CatalogUtil;
 +import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.catalog.proto.CatalogProtos;
 +import org.apache.tajo.conf.TajoConf;
 +import org.apache.tajo.storage.StorageConstants;
 +import org.apache.tajo.util.FileUtil;
 +import org.apache.tajo.util.NetUtils;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.net.URISyntaxException;
 +import java.net.URL;
 +
 +import static org.junit.Assert.*;
 +
 +/**
 + * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}.
 + */
 +public class TestAvroUtil {
 +  private Schema expected;
 +  private URL schemaUrl;
 +
 +  @Before
 +  public void setUp() throws Exception {
-     schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc");
++    schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
 +    assertNotNull(schemaUrl);
 +
 +    File file = new File(schemaUrl.getPath());
 +    assertTrue(file.exists());
 +
 +    expected = new Schema.Parser().parse(file);
 +  }
 +
 +  @Test
 +  public void testGetSchema() throws IOException, URISyntaxException {
 +    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
 +    meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath())));
 +    Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf());
 +    assertEquals(expected, schema);
 +
 +    meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
 +    meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath());
 +    schema = AvroUtil.getAvroSchema(meta, new TajoConf());
 +    assertEquals(expected, schema);
 +
 +    HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
 +    try {
 +      server.start();
 +      InetSocketAddress addr = server.getBindAddress();
 +
 +      String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath();
 +      meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
 +      meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url);
 +      schema = AvroUtil.getAvroSchema(meta, new TajoConf());
 +    } finally {
 +      server.stop();
 +    }
 +    assertEquals(expected, schema);
 +  }
 +
 +  @Test
 +  public void testGetSchemaFromHttp() throws IOException, URISyntaxException {
 +    HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
 +    try {
 +      server.start();
 +      InetSocketAddress addr = server.getBindAddress();
 +
 +      Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath());
 +      assertEquals(expected, schema);
 +    } finally {
 +      server.stop();
 +    }
 +  }
 +
 +  @Test
 +  public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException {
 +    Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf());
 +
 +    assertEquals(expected, schema);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java
index 0000000,0000000..bf7516f
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java
@@@ -1,0 -1,0 +1,197 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.json;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.IOUtils;
++import org.apache.hadoop.io.compress.DeflateCodec;
++import org.apache.tajo.catalog.CatalogUtil;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.storage.ByteBufInputChannel;
++import org.apache.tajo.storage.FileAppender;
++import org.apache.tajo.storage.StorageManager;
++import org.apache.tajo.storage.VTuple;
++import org.apache.tajo.storage.fragment.FileFragment;
++import org.apache.tajo.storage.text.ByteBufLineReader;
++import org.apache.tajo.storage.text.DelimitedLineReader;
++import org.apache.tajo.storage.text.DelimitedTextFile;
++import org.apache.tajo.util.CommonTestingUtil;
++import org.apache.tajo.util.FileUtil;
++import org.junit.Test;
++
++import java.io.File;
++import java.io.FileInputStream;
++import java.io.IOException;
++import java.util.concurrent.atomic.AtomicInteger;
++
++import static org.junit.Assert.*;
++
++public class TestLineReader {
++	private static String TEST_PATH = "target/test-data/TestLineReader";
++
++  @Test
++  public void testByteBufLineReader() throws IOException {
++    TajoConf conf = new TajoConf();
++    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
++    FileSystem fs = testDir.getFileSystem(conf);
++
++    Schema schema = new Schema();
++    schema.addColumn("id", Type.INT4);
++    schema.addColumn("age", Type.INT8);
++    schema.addColumn("comment", Type.TEXT);
++    schema.addColumn("comment2", Type.TEXT);
++
++    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
++    Path tablePath = new Path(testDir, "line.data");
++    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
++        null, null, meta, schema, tablePath);
++    appender.enableStats();
++    appender.init();
++    int tupleNum = 10000;
++    VTuple vTuple;
++
++    for (int i = 0; i < tupleNum; i++) {
++      vTuple = new VTuple(4);
++      vTuple.put(0, DatumFactory.createInt4(i + 1));
++      vTuple.put(1, DatumFactory.createInt8(25l));
++      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
++      vTuple.put(3, NullDatum.get());
++      appender.addTuple(vTuple);
++    }
++    appender.close();
++
++    FileStatus status = fs.getFileStatus(tablePath);
++
++    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
++    assertEquals(status.getLen(), channel.available());
++    ByteBufLineReader reader = new ByteBufLineReader(channel);
++    assertEquals(status.getLen(), reader.available());
++
++    long totalRead = 0;
++    int i = 0;
++    AtomicInteger bytes = new AtomicInteger();
++    for(;;){
++      ByteBuf buf = reader.readLineBuf(bytes);
++      if(buf == null) break;
++
++      totalRead += bytes.get();
++      i++;
++    }
++    IOUtils.cleanup(null, reader, channel, fs);
++    assertEquals(tupleNum, i);
++    assertEquals(status.getLen(), totalRead);
++    assertEquals(status.getLen(), reader.readBytes());
++  }
++
++  @Test
++  public void testLineDelimitedReader() throws IOException {
++    TajoConf conf = new TajoConf();
++    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
++    FileSystem fs = testDir.getFileSystem(conf);
++
++    Schema schema = new Schema();
++    schema.addColumn("id", Type.INT4);
++    schema.addColumn("age", Type.INT8);
++    schema.addColumn("comment", Type.TEXT);
++    schema.addColumn("comment2", Type.TEXT);
++
++    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
++    meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
++
++    Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
++    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
++        null, null, meta, schema, tablePath);
++    appender.enableStats();
++    appender.init();
++    int tupleNum = 10000;
++    VTuple vTuple;
++
++    long splitOffset = 0;
++    for (int i = 0; i < tupleNum; i++) {
++      vTuple = new VTuple(4);
++      vTuple.put(0, DatumFactory.createInt4(i + 1));
++      vTuple.put(1, DatumFactory.createInt8(25l));
++      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
++      vTuple.put(3, NullDatum.get());
++      appender.addTuple(vTuple);
++
++      if(i == (tupleNum / 2)){
++        splitOffset = appender.getOffset();
++      }
++    }
++    String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
++    appender.close();
++
++    tablePath = tablePath.suffix(extension);
++    FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
++    DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
++    assertTrue(reader.isCompressed());
++    assertFalse(reader.isReadable());
++    reader.init();
++    assertTrue(reader.isReadable());
++
++
++    int i = 0;
++    while(reader.isReadable()){
++      ByteBuf buf = reader.readLine();
++      if(buf == null) break;
++      i++;
++    }
++
++    IOUtils.cleanup(null, reader, fs);
++    assertEquals(tupleNum, i);
++
++  }
++
++  @Test
++  public void testByteBufLineReaderWithoutTerminating() throws IOException {
++    String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
++    File file = new File(path);
++    String data = FileUtil.readTextFile(file);
++
++    ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
++
++    assertEquals(file.length(), channel.available());
++    ByteBufLineReader reader = new ByteBufLineReader(channel);
++    assertEquals(file.length(), reader.available());
++
++    long totalRead = 0;
++    int i = 0;
++    AtomicInteger bytes = new AtomicInteger();
++    for(;;){
++      ByteBuf buf = reader.readLineBuf(bytes);
++      if(buf == null) break;
++      totalRead += bytes.get();
++      i++;
++    }
++    IOUtils.cleanup(null, reader);
++    assertEquals(file.length(), totalRead);
++    assertEquals(file.length(), reader.readBytes());
++    assertEquals(data.split("\n").length, i);
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
index 0000000,0000000..8ee3408
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
@@@ -1,0 -1,0 +1,1 @@@
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
index 0000000,0000000..7403c26
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
@@@ -1,0 -1,0 +1,2 @@@
++1|25|emiya muljomdao
++2|25|emiya muljomdao

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
index 0000000,0000000..d4250a9
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
@@@ -1,0 -1,0 +1,20 @@@
++{
++  "type": "record",
++  "namespace": "org.apache.tajo",
++  "name": "testVariousTypes",
++  "fields": [
++    { "name": "col1", "type": "boolean" },
++    { "name": "col2", "type": "string" },
++    { "name": "col3", "type": "int" },
++    { "name": "col4", "type": "int" },
++    { "name": "col5", "type": "long" },
++    { "name": "col6", "type": "float" },
++    { "name": "col7", "type": "double" },
++    { "name": "col8", "type": "string" },
++    { "name": "col9", "type": "bytes" },
++    { "name": "col10", "type": "bytes" },
++    { "name": "col11", "type": "null" },
++    { "name": "col12", "type": "bytes" }
++  ]
++}
++

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 6190d1a,0000000..737284b
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@@ -1,164 -1,0 +1,178 @@@
 +<?xml version="1.0"?>
 +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 +
 +<!--
 +  Licensed to the Apache Software Foundation (ASF) under one
 +  or more contributor license agreements.  See the NOTICE file
 +  distributed with this work for additional information
 +  regarding copyright ownership.  The ASF licenses this file
 +  to you under the Apache License, Version 2.0 (the
 +  "License"); you may not use this file except in compliance
 +  with the License.  You may obtain a copy of the License at
 +
 +      http://www.apache.org/licenses/LICENSE-2.0
 +
 +  Unless required by applicable law or agreed to in writing, software
 +  distributed under the License is distributed on an "AS IS" BASIS,
 +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +  See the License for the specific language governing permissions and
 +  limitations under the License.
 +  -->
 +
 +<configuration>
 +  <property>
 +    <name>fs.s3.impl</name>
 +    <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
 +  </property>
 +
 +  <!-- Storage Manager Configuration -->
 +  <property>
 +    <name>tajo.storage.manager.hdfs.class</name>
 +    <value>org.apache.tajo.storage.FileStorageManager</value>
 +  </property>
 +  <property>
 +    <name>tajo.storage.manager.hbase.class</name>
 +    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
 +  </property>
 +
 +  <!--- Registered Scanner Handler -->
 +  <property>
 +    <name>tajo.storage.scanner-handler</name>
-     <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
++    <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
 +  </property>
 +
 +  <!--- Fragment Class Configurations -->
 +  <property>
 +    <name>tajo.storage.fragment.textfile.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +  <property>
 +    <name>tajo.storage.fragment.csv.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +  <property>
++    <name>tajo.storage.fragment.json.class</name>
++    <value>org.apache.tajo.storage.fragment.FileFragment</value>
++  </property>
++  <property>
 +    <name>tajo.storage.fragment.raw.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +  <property>
 +    <name>tajo.storage.fragment.rcfile.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +  <property>
 +    <name>tajo.storage.fragment.row.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +  <property>
 +    <name>tajo.storage.fragment.parquet.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +  <property>
 +    <name>tajo.storage.fragment.sequencefile.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +  <property>
 +    <name>tajo.storage.fragment.avro.class</name>
 +    <value>org.apache.tajo.storage.fragment.FileFragment</value>
 +  </property>
 +
 +  <!--- Scanner Handler -->
 +  <property>
 +    <name>tajo.storage.scanner-handler.textfile.class</name>
 +    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.scanner-handler.csv.class</name>
 +    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
 +  </property>
 +
 +  <property>
++    <name>tajo.storage.scanner-handler.json.class</name>
++    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
++  </property>
++
++  <property>
 +    <name>tajo.storage.scanner-handler.raw.class</name>
 +    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.scanner-handler.rcfile.class</name>
 +    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.scanner-handler.rowfile.class</name>
 +    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.scanner-handler.parquet.class</name>
 +    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.scanner-handler.sequencefile.class</name>
 +    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.scanner-handler.avro.class</name>
 +    <value>org.apache.tajo.storage.avro.AvroScanner</value>
 +  </property>
 +
 +  <!--- Appender Handler -->
 +  <property>
 +    <name>tajo.storage.appender-handler</name>
 +    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.appender-handler.textfile.class</name>
 +    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.appender-handler.csv.class</name>
 +    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
 +  </property>
 +
 +  <property>
++    <name>tajo.storage.appender-handler.json.class</name>
++    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
++  </property>
++
++  <property>
 +    <name>tajo.storage.appender-handler.raw.class</name>
 +    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.appender-handler.rcfile.class</name>
 +    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.appender-handler.rowfile.class</name>
 +    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.appender-handler.parquet.class</name>
 +    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.appender-handler.sequencefile.class</name>
 +    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
 +  </property>
 +
 +  <property>
 +    <name>tajo.storage.appender-handler.avro.class</name>
 +    <value>org.apache.tajo.storage.avro.AvroAppender</value>
 +  </property>
 +</configuration>