You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:13 UTC

[09/29] tajo git commit: TAJO-1095: Implement Json file scanner.

TAJO-1095: Implement Json file scanner.

Closes #181


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cd38dffb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cd38dffb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cd38dffb

Branch: refs/heads/hbase_storage
Commit: cd38dffb908a3959472f5ddb705db71d0e48ad89
Parents: 7d41c67
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Nov 28 17:26:38 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Nov 28 17:26:38 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |   9 +-
 .../org/apache/tajo/catalog/SchemaUtil.java     |   8 +
 .../src/main/proto/CatalogProtos.proto          |   1 +
 .../apache/tajo/storage/StorageConstants.java   |   2 +-
 .../java/org/apache/tajo/storage/VTuple.java    |   4 +-
 tajo-storage/pom.xml                            |   6 +
 .../tajo/storage/json/JsonLineDeserializer.java | 220 +++++++++++++++++++
 .../apache/tajo/storage/json/JsonLineSerDe.java |  37 ++++
 .../tajo/storage/json/JsonLineSerializer.java   | 134 +++++++++++
 .../tajo/storage/text/CSVLineDeserializer.java  |   4 +-
 .../apache/tajo/storage/text/CSVLineSerDe.java  |   4 -
 .../tajo/storage/text/CSVLineSerializer.java    |  15 +-
 .../tajo/storage/text/DelimitedTextFile.java    |   8 +-
 .../text/TextFieldSerializerDeserializer.java   |   2 +-
 .../tajo/storage/text/TextLineDeserializer.java |   4 +-
 .../src/main/resources/storage-default.xml      |  16 +-
 .../org/apache/tajo/storage/TestStorages.java   | 105 +++++----
 .../apache/tajo/storage/json/TestJsonSerDe.java | 101 +++++++++
 .../dataset/TestJsonSerDe/testVariousType.json  |   1 +
 .../src/test/resources/storage-default.xml      |  16 +-
 .../src/test/resources/testVariousTypes.avsc    |  19 +-
 22 files changed, 637 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d0677c..025ae88 100644
--- a/CHANGES
+++ b/CHANGES
@@ -11,6 +11,8 @@ Release 0.9.1 - unreleased
 
     TAJO-235: Support Oracle CatalogStore. (Jihun Kang via hyunsik)
 
+    TAJO-1095: Implement Json file scanner. (hyunsik)
+
   IMPROVEMENT
 
     TAJO-1165: Needs to show error messages on query_executor.jsp. (Jihun Kang via jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 737c9ae..f2d9b9c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -31,9 +31,11 @@ import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.exception.InvalidOperationException;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.TUtil;
+import org.mortbay.util.ajax.JSON;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -278,13 +280,16 @@ public class CatalogUtil {
       return StoreType.AVRO;
     } else if (typeStr.equalsIgnoreCase(StoreType.TEXTFILE.name())) {
       return StoreType.TEXTFILE;
+    } else if (typeStr.equalsIgnoreCase(StoreType.JSON.name())) {
+      return StoreType.JSON;
     } else {
       return null;
     }
   }
 
   public static TableMeta newTableMeta(StoreType type) {
-    return new TableMeta(type, new KeyValueSet());
+    KeyValueSet defaultProperties = CatalogUtil.newPhysicalProperties(type);
+    return new TableMeta(type, defaultProperties);
   }
 
   public static TableMeta newTableMeta(StoreType type, KeyValueSet options) {
@@ -821,6 +826,8 @@ public class CatalogUtil {
     KeyValueSet options = new KeyValueSet();
     if (StoreType.CSV == type || StoreType.TEXTFILE == type) {
       options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    } else if (StoreType.JSON == type) {
+      options.set(StorageConstants.TEXT_SERDE_CLASS, "org.apache.tajo.storage.json.JsonLineSerDe");
     } else if (StoreType.RCFILE == type) {
       options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
     } else if (StoreType.SEQUENCEFILE == type) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
index ee670ef..23ebe1b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
@@ -100,4 +100,12 @@ public class SchemaUtil {
     }
     return types;
   }
+
+  public static String [] toSimpleNames(Schema schema) {
+    String [] names = new String[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      names[i] = schema.getColumn(i).getSimpleName();
+    }
+    return names;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 99f594a..f29bc6c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -36,6 +36,7 @@ enum StoreType {
   SEQUENCEFILE = 8;
   AVRO = 9;
   TEXTFILE = 10;
+  JSON = 11;
 }
 
 enum OrderType {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index 3065d31..a3d8de0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -33,7 +33,7 @@ public class StorageConstants {
   public static final String TEXT_DELIMITER = "text.delimiter";
   public static final String TEXT_NULL = "text.null";
   public static final String TEXT_SERDE_CLASS = "text.serde.class";
-  public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerde";
+  public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerDe";
 
   @Deprecated
   public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter";

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
index 6304734..5e839b7 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -56,7 +56,7 @@ public class VTuple implements Tuple, Cloneable {
 
   @Override
   public boolean isNull(int fieldid) {
-    return values[fieldid].isNull();
+    return values[fieldid] == null || values[fieldid].isNull();
   }
 
   @Override
@@ -93,7 +93,7 @@ public class VTuple implements Tuple, Cloneable {
   }
 
   public void put(Datum [] values) {
-    System.arraycopy(values, 0, this.values, 0, size());
+    System.arraycopy(values, 0, this.values, 0, values.length);
 	}
 	
 	//////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index c6877c4..ef26a32 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -72,6 +72,7 @@
         <configuration>
           <excludes>
             <exclude>src/test/resources/testVariousTypes.avsc</exclude>
+            <exclude>src/test/resources/dataset/TestJsonSerDe/*.json</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -313,6 +314,11 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>net.minidev</groupId>
+      <artifactId>json-smart</artifactId>
+      <version>2.0</version>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
new file mode 100644
index 0000000..37cd9f3
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+
+import io.netty.buffer.ByteBuf;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class JsonLineDeserializer extends TextLineDeserializer {
+  private JSONParser parser;
+  private Type [] types;
+  private String [] columnNames;
+
+  public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+    super(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public void init() {
+    types = SchemaUtil.toTypes(schema);
+    columnNames = SchemaUtil.toSimpleNames(schema);
+
+    parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE);
+  }
+
+  @Override
+  public void deserialize(ByteBuf buf, Tuple output) throws IOException {
+    byte [] line = new byte[buf.readableBytes()];
+    buf.readBytes(line);
+
+    try {
+      JSONObject object = (JSONObject) parser.parse(line);
+
+      for (int i = 0; i < targetColumnIndexes.length; i++) {
+        int actualIdx = targetColumnIndexes[i];
+        String fieldName = columnNames[actualIdx];
+
+        if (!object.containsKey(fieldName)) {
+          output.put(actualIdx, NullDatum.get());
+          continue;
+        }
+
+        switch (types[actualIdx]) {
+        case BOOLEAN:
+          String boolStr = object.getAsString(fieldName);
+          if (boolStr != null) {
+            output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true")));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case CHAR:
+          String charStr = object.getAsString(fieldName);
+          if (charStr != null) {
+            output.put(actualIdx, DatumFactory.createChar(charStr));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case INT1:
+        case INT2:
+          Number int2Num = object.getAsNumber(fieldName);
+          if (int2Num != null) {
+            output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue()));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case INT4:
+          Number int4Num = object.getAsNumber(fieldName);
+          if (int4Num != null) {
+            output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue()));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case INT8:
+          Number int8Num = object.getAsNumber(fieldName);
+          if (int8Num != null) {
+            output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue()));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case FLOAT4:
+          Number float4Num = object.getAsNumber(fieldName);
+          if (float4Num != null) {
+            output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue()));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case FLOAT8:
+          Number float8Num = object.getAsNumber(fieldName);
+          if (float8Num != null) {
+            output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue()));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case TEXT:
+          String textStr = object.getAsString(fieldName);
+          if (textStr != null) {
+            output.put(actualIdx, DatumFactory.createText(textStr));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case TIMESTAMP:
+          String timestampStr = object.getAsString(fieldName);
+          if (timestampStr != null) {
+            output.put(actualIdx, DatumFactory.createTimestamp(timestampStr));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case TIME:
+          String timeStr = object.getAsString(fieldName);
+          if (timeStr != null) {
+            output.put(actualIdx, DatumFactory.createTime(timeStr));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case DATE:
+          String dateStr = object.getAsString(fieldName);
+          if (dateStr != null) {
+            output.put(actualIdx, DatumFactory.createDate(dateStr));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+        case BIT:
+        case BINARY:
+        case VARBINARY:
+        case BLOB: {
+          Object jsonObject = object.get(fieldName);
+
+          if (jsonObject == null) {
+            output.put(actualIdx, NullDatum.get());
+            break;
+          } if (jsonObject instanceof String) {
+            output.put(actualIdx, DatumFactory.createBlob((String)jsonObject));
+          } else if (jsonObject instanceof JSONArray) {
+            JSONArray jsonArray = (JSONArray) jsonObject;
+            byte[] bytes = new byte[jsonArray.size()];
+            Iterator<Object> it = jsonArray.iterator();
+            int arrayIdx = 0;
+            while (it.hasNext()) {
+              bytes[arrayIdx++] = ((Long) it.next()).byteValue();
+            }
+            if (bytes.length > 0) {
+              output.put(actualIdx, DatumFactory.createBlob(bytes));
+            } else {
+              output.put(actualIdx, NullDatum.get());
+            }
+            break;
+          } else {
+            throw new IOException("Unknown json object: " + object.getClass().getSimpleName());
+          }
+          break;
+        }
+        case INET4:
+          String inetStr = object.getAsString(fieldName);
+          if (inetStr != null) {
+            output.put(actualIdx, DatumFactory.createInet4(inetStr));
+          } else {
+            output.put(actualIdx, NullDatum.get());
+          }
+          break;
+
+        case NULL_TYPE:
+          output.put(actualIdx, NullDatum.get());
+          break;
+
+        default:
+          throw new NotImplementedException(types[actualIdx].name() + " is not supported.");
+        }
+      }
+
+    } catch (Throwable e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void release() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
new file mode 100644
index 0000000..6db2c29
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineSerDe;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+public class JsonLineSerDe extends TextLineSerDe {
+  @Override
+  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+    return new JsonLineDeserializer(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+    return new JsonLineSerializer(schema, meta);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
new file mode 100644
index 0000000..c7007d8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+
+import net.minidev.json.JSONObject;
+import org.apache.commons.lang.CharSet;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextLineSerDe;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+public class JsonLineSerializer extends TextLineSerializer {
+  private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+
+  private Type [] types;
+  private String [] simpleNames;
+  private int columnNum;
+
+
+  public JsonLineSerializer(Schema schema, TableMeta meta) {
+    super(schema, meta);
+  }
+
+  @Override
+  public void init() {
+    types = SchemaUtil.toTypes(schema);
+    simpleNames = SchemaUtil.toSimpleNames(schema);
+    columnNum = schema.size();
+  }
+
+  @Override
+  public int serialize(OutputStream out, Tuple input) throws IOException {
+    JSONObject jsonObject = new JSONObject();
+
+    for (int i = 0; i < columnNum; i++) {
+      if (input.isNull(i)) {
+        continue;
+      }
+
+      String fieldName = simpleNames[i];
+      Type type = types[i];
+
+      switch (type) {
+
+      case BOOLEAN:
+        jsonObject.put(fieldName, input.getBool(i));
+        break;
+
+      case INT1:
+      case INT2:
+        jsonObject.put(fieldName, input.getInt2(i));
+        break;
+
+      case INT4:
+        jsonObject.put(fieldName, input.getInt4(i));
+        break;
+
+      case INT8:
+        jsonObject.put(fieldName, input.getInt8(i));
+        break;
+
+      case FLOAT4:
+        jsonObject.put(fieldName, input.getFloat4(i));
+        break;
+
+      case FLOAT8:
+        jsonObject.put(fieldName, input.getFloat8(i));
+        break;
+
+      case CHAR:
+      case TEXT:
+      case VARCHAR:
+      case INET4:
+      case TIMESTAMP:
+      case DATE:
+      case TIME:
+      case INTERVAL:
+        jsonObject.put(fieldName, input.getText(i));
+        break;
+
+      case BIT:
+      case BINARY:
+      case BLOB:
+      case VARBINARY:
+        jsonObject.put(fieldName, input.getBytes(i));
+        break;
+
+      case NULL_TYPE:
+        break;
+
+      default:
+        throw new NotImplementedException(types[i].name() + " is not supported.");
+      }
+    }
+
+    String jsonStr = jsonObject.toJSONString();
+    byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET);
+    out.write(jsonBytes);
+    return jsonBytes.length;
+  }
+
+  @Override
+  public void release() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index f580da1..0e2dfb0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -48,7 +48,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
     fieldSerDer = new TextFieldSerializerDeserializer();
   }
 
-  public void deserialize(final ByteBuf lineBuf, Tuple tuple) throws IOException {
+  public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException {
     int[] projection = targetColumnIndexes;
     if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
       return;
@@ -73,7 +73,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
       if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
         lineBuf.setIndex(start, start + fieldLength);
         Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
-        tuple.put(currentIndex, datum);
+        output.put(currentIndex, datum);
         currentTarget++;
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
index e2686a6..2fe7f23 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -24,10 +24,6 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.storage.StorageConstants;
 
 public class CSVLineSerDe extends TextLineSerDe {
-
-  public CSVLineSerDe() {
-  }
-
   @Override
   public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
     return new CSVLineDeserializer(schema, meta, targetColumnIndexes);

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
index 684519c..7397000 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.text;
 
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
@@ -32,6 +33,7 @@ public class CSVLineSerializer extends TextLineSerializer {
 
   private byte [] nullChars;
   private char delimiter;
+  private int columnNum;
 
   public CSVLineSerializer(Schema schema, TableMeta meta) {
     super(schema, meta);
@@ -41,25 +43,26 @@ public class CSVLineSerializer extends TextLineSerializer {
   public void init() {
     nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
     delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+    columnNum = schema.size();
 
     serde = new TextFieldSerializerDeserializer();
   }
 
   @Override
   public int serialize(OutputStream out, Tuple input) throws IOException {
-    int rowBytes = 0;
+    int writtenBytes = 0;
 
-    for (int i = 0; i < schema.size(); i++) {
+    for (int i = 0; i < columnNum; i++) {
       Datum datum = input.get(i);
-      rowBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
+      writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
 
-      if (schema.size() - 1 > i) {
+      if (columnNum - 1 > i) {
         out.write((byte) delimiter);
-        rowBytes += 1;
+        writtenBytes += 1;
       }
     }
 
-    return rowBytes;
+    return writtenBytes;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index d15f394..2218fae 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -79,12 +79,12 @@ public class DelimitedTextFile {
       if (serdeClassCache.containsKey(serDeClassName)) {
         serdeClass = serdeClassCache.get(serDeClassName);
       } else {
-        serdeClass = (Class<? extends TextLineSerDe>) Class.forName(CSVLineSerDe.class.getName());
+        serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName);
         serdeClassCache.put(serDeClassName, serdeClass);
       }
       lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
     } catch (Throwable e) {
-      throw new RuntimeException("TextLineSerde class cannot be initialized");
+      throw new RuntimeException("TextLineSerde class cannot be initialized.", e);
     }
 
     return lineSerder;
@@ -382,7 +382,9 @@ public class DelimitedTextFile {
     @Override
     public void close() throws IOException {
       try {
-        deserializer.release();
+        if (deserializer != null) {
+          deserializer.release();
+        }
 
         if (tableStats != null && reader != null) {
           tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed Bytes. (decompressed bytes + overhead)

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
index 9722959..95d0407 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@ -38,7 +38,7 @@ import java.nio.charset.CharsetDecoder;
 public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
   public static final byte[] trueBytes = "true".getBytes();
   public static final byte[] falseBytes = "false".getBytes();
-  private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+  private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
   private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
 
   private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index 645d118..b0d3c3a 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -48,10 +48,10 @@ public abstract class TextLineDeserializer {
    * It fills a tuple with a read fields in a given line.
    *
    * @param buf Read line
-   * @param tuple Tuple to be filled with read fields
+   * @param output Tuple to be filled with read fields
    * @throws java.io.IOException
    */
-  public abstract void deserialize(final ByteBuf buf, Tuple tuple) throws IOException;
+  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException;
 
   /**
    * Release external resources

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
index 064f250..e861b7d 100644
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -35,7 +35,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+    <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -48,6 +48,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.json.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.raw.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -84,6 +88,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
   </property>
@@ -130,6 +139,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.appender-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
     <name>tajo.storage.appender-handler.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
   </property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 6e2bc35..c581926 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -80,18 +80,17 @@ public class TestStorages {
       "  \"name\": \"testNullHandlingTypes\",\n" +
       "  \"fields\": [\n" +
       "    { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
-      "    { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" +
-      "    { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" +
       "    { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
-      "    { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" +
-      "    { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" +
-      "    { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" +
-      "    { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" +
-      "    { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" +
+      "    { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" +
+      "    { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" +
+      "    { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" +
       "    { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
-      "    { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" +
-      "    { \"name\": \"col12\", \"type\": \"null\" },\n" +
-      "    { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" +
+      "    { \"name\": \"col11\", \"type\": \"null\" },\n" +
+      "    { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" +
       "  ]\n" +
       "}\n";
 
@@ -129,6 +128,7 @@ public class TestStorages {
         {StoreType.SEQUENCEFILE, true, true, false},
         {StoreType.AVRO, false, false, false},
         {StoreType.TEXTFILE, true, true, false},
+        {StoreType.JSON, true, true, false},
     });
   }
 
@@ -298,20 +298,23 @@ public class TestStorages {
 
   @Test
   public void testVariousTypes() throws IOException {
+    boolean handleProtobuf = storeType != StoreType.JSON;
+
     Schema schema = new Schema();
     schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.BIT);
-    schema.addColumn("col3", Type.CHAR, 7);
-    schema.addColumn("col4", Type.INT2);
-    schema.addColumn("col5", Type.INT4);
-    schema.addColumn("col6", Type.INT8);
-    schema.addColumn("col7", Type.FLOAT4);
-    schema.addColumn("col8", Type.FLOAT8);
-    schema.addColumn("col9", Type.TEXT);
-    schema.addColumn("col10", Type.BLOB);
-    schema.addColumn("col11", Type.INET4);
-    schema.addColumn("col12", Type.NULL_TYPE);
-    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    if (handleProtobuf) {
+      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    }
 
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
@@ -328,10 +331,9 @@ public class TestStorages {
     QueryId queryid = new QueryId("12345", 5);
     ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 
-    Tuple tuple = new VTuple(13);
+    Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0));
     tuple.put(new Datum[] {
         DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
         DatumFactory.createChar("hyunsik"),
         DatumFactory.createInt2((short) 17),
         DatumFactory.createInt4(59),
@@ -341,9 +343,12 @@ public class TestStorages {
         DatumFactory.createText("hyunsik"),
         DatumFactory.createBlob("hyunsik".getBytes()),
         DatumFactory.createInet4("192.168.0.1"),
-        NullDatum.get(),
-        factory.createDatum(queryid.getProto())
+        NullDatum.get()
     });
+    if (handleProtobuf) {
+      tuple.put(11, factory.createDatum(queryid.getProto()));
+    }
+
     appender.addTuple(tuple);
     appender.flush();
     appender.close();
@@ -364,20 +369,24 @@ public class TestStorages {
 
   @Test
   public void testNullHandlingTypes() throws IOException {
+    boolean handleProtobuf = storeType != StoreType.JSON;
+
     Schema schema = new Schema();
     schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.BIT);
-    schema.addColumn("col3", Type.CHAR, 7);
-    schema.addColumn("col4", Type.INT2);
-    schema.addColumn("col5", Type.INT4);
-    schema.addColumn("col6", Type.INT8);
-    schema.addColumn("col7", Type.FLOAT4);
-    schema.addColumn("col8", Type.FLOAT8);
-    schema.addColumn("col9", Type.TEXT);
-    schema.addColumn("col10", Type.BLOB);
-    schema.addColumn("col11", Type.INET4);
-    schema.addColumn("col12", Type.NULL_TYPE);
-    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+
+    if (handleProtobuf) {
+      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    }
 
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
@@ -397,11 +406,10 @@ public class TestStorages {
 
     QueryId queryid = new QueryId("12345", 5);
     ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
-    Tuple seedTuple = new VTuple(13);
+    int columnNum = 11 + (handleProtobuf ? 1 : 0);
+    Tuple seedTuple = new VTuple(columnNum);
     seedTuple.put(new Datum[]{
         DatumFactory.createBool(true),                // 0
-        DatumFactory.createBit((byte) 0x99),          // 1
         DatumFactory.createChar("hyunsik"),           // 2
         DatumFactory.createInt2((short) 17),          // 3
         DatumFactory.createInt4(59),                  // 4
@@ -412,14 +420,17 @@ public class TestStorages {
         DatumFactory.createBlob("hyunsik".getBytes()),// 9
         DatumFactory.createInet4("192.168.0.1"),      // 10
         NullDatum.get(),                              // 11
-        factory.createDatum(queryid.getProto())       // 12
     });
 
+    if (handleProtobuf) {
+      seedTuple.put(11, factory.createDatum(queryid.getProto()));       // 12
+    }
+
     // Making tuples with different null column positions
     Tuple tuple;
-    for (int i = 0; i < 13; i++) {
-      tuple = new VTuple(13);
-      for (int j = 0; j < 13; j++) {
+    for (int i = 0; i < columnNum; i++) {
+      tuple = new VTuple(columnNum);
+      for (int j = 0; j < columnNum; j++) {
         if (i == j) { // i'th column will have NULL value
           tuple.put(j, NullDatum.get());
         } else {
@@ -439,8 +450,8 @@ public class TestStorages {
     Tuple retrieved;
     int i = 0;
     while ((retrieved = scanner.next()) != null) {
-      assertEquals(13, retrieved.size());
-      for (int j = 0; j < 13; j++) {
+      assertEquals(columnNum, retrieved.size());
+      for (int j = 0; j < columnNum; j++) {
         if (i == j) {
           assertEquals(NullDatum.get(), retrieved.get(j));
         } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
new file mode 100644
index 0000000..038bc17
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+public class TestJsonSerDe {
+  private static Schema schema = new Schema();
+
+  static {
+    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+    schema.addColumn("col2", TajoDataTypes.Type.CHAR, 7);
+    schema.addColumn("col3", TajoDataTypes.Type.INT2);
+    schema.addColumn("col4", TajoDataTypes.Type.INT4);
+    schema.addColumn("col5", TajoDataTypes.Type.INT8);
+    schema.addColumn("col6", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col7", TajoDataTypes.Type.FLOAT8);
+    schema.addColumn("col8", TajoDataTypes.Type.TEXT);
+    schema.addColumn("col9", TajoDataTypes.Type.BLOB);
+    schema.addColumn("col10", TajoDataTypes.Type.INET4);
+    schema.addColumn("col11", TajoDataTypes.Type.NULL_TYPE);
+  }
+
+  public static Path getResourcePath(String path, String suffix) {
+    URL resultBaseURL = ClassLoader.getSystemResource(path);
+    return new Path(resultBaseURL.toString(), suffix);
+  }
+
+  @Test
+  public void testVarioutType() throws IOException {
+    TajoConf conf = new TajoConf();
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), "testVariousType.json");
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple tuple = scanner.next();
+    assertNotNull(tuple);
+    assertNull(scanner.next());
+    scanner.close();
+
+    Tuple baseTuple = new VTuple(11);
+    baseTuple.put(new Datum[] {
+        DatumFactory.createBool(true),                  // 0
+        DatumFactory.createChar("hyunsik"),             // 1
+        DatumFactory.createInt2((short) 17),            // 2
+        DatumFactory.createInt4(59),                    // 3
+        DatumFactory.createInt8(23l),                   // 4
+        DatumFactory.createFloat4(77.9f),               // 5
+        DatumFactory.createFloat8(271.9d),              // 6
+        DatumFactory.createText("hyunsik"),             // 7
+        DatumFactory.createBlob("hyunsik".getBytes()),  // 8
+        DatumFactory.createInet4("192.168.0.1"),        // 9
+        NullDatum.get(),                                // 10
+    });
+
+    assertEquals(baseTuple, tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json b/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
new file mode 100644
index 0000000..8ee3408
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
@@ -0,0 +1 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
index 790d5a8..f4c81c7 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -28,7 +28,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+    <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -41,6 +41,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.json.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.raw.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -77,6 +81,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
   </property>
@@ -123,6 +132,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.appender-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
     <name>tajo.storage.appender-handler.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
   </property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/resources/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc
index 611b97f..d4250a9 100644
--- a/tajo-storage/src/test/resources/testVariousTypes.avsc
+++ b/tajo-storage/src/test/resources/testVariousTypes.avsc
@@ -4,18 +4,17 @@
   "name": "testVariousTypes",
   "fields": [
     { "name": "col1", "type": "boolean" },
-    { "name": "col2", "type": "int" },
-    { "name": "col3", "type": "string" },
+    { "name": "col2", "type": "string" },
+    { "name": "col3", "type": "int" },
     { "name": "col4", "type": "int" },
-    { "name": "col5", "type": "int" },
-    { "name": "col6", "type": "long" },
-    { "name": "col7", "type": "float" },
-    { "name": "col8", "type": "double" },
-    { "name": "col9", "type": "string" },
+    { "name": "col5", "type": "long" },
+    { "name": "col6", "type": "float" },
+    { "name": "col7", "type": "double" },
+    { "name": "col8", "type": "string" },
+    { "name": "col9", "type": "bytes" },
     { "name": "col10", "type": "bytes" },
-    { "name": "col11", "type": "bytes" },
-    { "name": "col12", "type": "null" },
-    { "name": "col13", "type": "bytes" }
+    { "name": "col11", "type": "null" },
+    { "name": "col12", "type": "bytes" }
   ]
 }