You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/04 05:07:35 UTC

[4/4] git commit: TAJO-224: Rearrange DataType enumeration and Refactor type systems. (hyunsik)

TAJO-224: Rearrange DataType enumeration and Refactor type systems. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/4bf5c0c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/4bf5c0c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/4bf5c0c3

Branch: refs/heads/master
Commit: 4bf5c0c3d51c2d12d57903ea7dc47cdfe4870290
Parents: 3e6d684
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Oct 4 12:03:26 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Oct 4 12:03:26 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |    3 +
 LICENSE.txt                                     |   38 +-
 NOTICE.txt                                      |    6 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |   16 +-
 .../java/org/apache/tajo/catalog/Column.java    |    2 +-
 .../org/apache/tajo/catalog/FunctionDesc.java   |   12 +-
 .../java/org/apache/tajo/catalog/Schema.java    |    2 +-
 .../tajo/catalog/statistics/StatisticsUtil.java |   38 +-
 .../tajo/catalog/statistics/TableStat.java      |    4 +
 .../tajo/catalog/statistics/TupleUtil.java      |   13 +
 .../apache/tajo/catalog/TestCatalogUtil.java    |    2 +-
 .../org/apache/tajo/catalog/TestColumn.java     |    6 +-
 .../apache/tajo/catalog/TestFunctionDesc.java   |   24 +-
 .../org/apache/tajo/catalog/store/DBStore.java  |    2 +-
 .../org/apache/tajo/catalog/TestCatalog.java    |   26 +-
 .../java/org/apache/tajo/datum/ArrayDatum.java  |   95 --
 .../org/apache/tajo/datum/DatumFactory.java     |   11 -
 .../java/org/apache/tajo/datum/Int8Datum.java   |    6 +-
 .../org/apache/tajo/datum/ProtobufDatum.java    |   81 +
 .../apache/tajo/datum/ProtobufDatumFactory.java |   91 ++
 .../protobuf/AbstractCharBasedFormatter.java    |   80 +
 .../tajo/datum/protobuf/ProtobufFormatter.java  |  168 ++
 .../tajo/datum/protobuf/ProtobufJsonFormat.java | 1463 ++++++++++++++++++
 .../apache/tajo/datum/protobuf/TextUtils.java   |  459 ++++++
 tajo-common/src/main/proto/DataTypes.proto      |   92 +-
 .../tajo/datum/protobuf/TestProtobufDatum.java  |   54 +
 tajo-core/tajo-core-backend/pom.xml             |    1 +
 .../eval/AggregationFunctionCallEval.java       |    2 +-
 .../org/apache/tajo/engine/eval/BinaryEval.java |   31 +-
 .../apache/tajo/engine/eval/CaseWhenEval.java   |    6 +-
 .../org/apache/tajo/engine/eval/CastEval.java   |    6 +-
 .../org/apache/tajo/engine/eval/ConstEval.java  |   20 +-
 .../org/apache/tajo/engine/eval/EvalNode.java   |    4 +-
 .../apache/tajo/engine/eval/EvalTreeUtil.java   |    7 +-
 .../org/apache/tajo/engine/eval/FieldEval.java  |    5 +-
 .../apache/tajo/engine/eval/FunctionEval.java   |    4 +-
 .../org/apache/tajo/engine/eval/InEval.java     |    7 +-
 .../org/apache/tajo/engine/eval/IsNullEval.java |    4 +-
 .../org/apache/tajo/engine/eval/NotEval.java    |    5 +-
 .../tajo/engine/eval/PartialBinaryExpr.java     |    2 +-
 .../engine/eval/PatternMatchPredicateEval.java  |    4 +-
 .../tajo/engine/eval/RowConstantEval.java       |   28 +-
 .../tajo/engine/function/AggFunction.java       |    2 +-
 .../tajo/engine/function/builtin/AvgDouble.java |   30 +-
 .../tajo/engine/function/builtin/AvgFloat.java  |   52 +-
 .../tajo/engine/function/builtin/AvgInt.java    |   53 +-
 .../tajo/engine/function/builtin/AvgLong.java   |   30 +-
 .../tajo/engine/function/builtin/CountRows.java |    4 +-
 .../tajo/engine/function/builtin/MaxDouble.java |    4 +-
 .../tajo/engine/function/builtin/MaxFloat.java  |    4 +-
 .../tajo/engine/function/builtin/MaxInt.java    |    4 +-
 .../tajo/engine/function/builtin/MaxLong.java   |    4 +-
 .../tajo/engine/function/builtin/MinDouble.java |    4 +-
 .../tajo/engine/function/builtin/MinFloat.java  |    4 +-
 .../tajo/engine/function/builtin/MinInt.java    |    4 +-
 .../tajo/engine/function/builtin/MinLong.java   |    4 +-
 .../tajo/engine/function/builtin/MinString.java |    4 +-
 .../tajo/engine/function/builtin/SumDouble.java |    4 +-
 .../tajo/engine/function/builtin/SumFloat.java  |    4 +-
 .../tajo/engine/function/builtin/SumInt.java    |    4 +-
 .../tajo/engine/function/builtin/SumLong.java   |    4 +-
 .../tajo/engine/planner/LogicalPlanner.java     |    8 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |   19 +-
 .../org/apache/tajo/engine/planner/Target.java  |   12 +-
 .../tajo/engine/planner/TargetListManager.java  |    2 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  108 +-
 .../tajo/master/querymaster/SubQuery.java       |    2 +-
 .../src/main/proto/InternalTypes.proto          |   32 +
 .../apache/tajo/engine/eval/TestEvalTree.java   |   22 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |    4 +-
 .../engine/planner/TestLogicalOptimizer.java    |    4 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |    4 +-
 .../tajo/engine/planner/TestPlannerUtil.java    |   60 +-
 .../org/apache/tajo/storage/TestRowFile.java    |   15 +-
 .../java/org/apache/tajo/storage/CSVFile.java   |   54 +-
 .../java/org/apache/tajo/storage/LazyTuple.java |   27 +-
 .../java/org/apache/tajo/storage/RawFile.java   |  109 +-
 .../java/org/apache/tajo/storage/RowFile.java   |   19 -
 .../apache/tajo/storage/TableStatistics.java    |   21 +-
 .../tajo/storage/rcfile/RCFileWrapper.java      |   40 +-
 .../tajo/storage/trevni/TrevniAppender.java     |   11 +-
 .../tajo/storage/trevni/TrevniScanner.java      |   15 +-
 .../apache/tajo/storage/v2/CSVFileScanner.java  |   30 +-
 .../org/apache/tajo/storage/TestStorages.java   |   16 +-
 84 files changed, 3029 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e802376..38c636c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-224: Rearrange DataType enumeration and Refactor type systems.
+    (hyunsik)
+
     TAJO-214: System should inquire finished query history after execution 
     in web. (SeongHwa Ahn via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index d8287ab..a8e952b 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -369,4 +369,40 @@ BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
 OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+------------------------------------------------------------------------------
+For protobuf-java-format (http://code.google.com/p/protobuf-java-format/)
+
+Copyright 2000-2011 NeuStar, Inc. All rights reserved.
+NeuStar, the Neustar logo and related names and logos are registered
+trademarks, service marks or tradenames of NeuStar, Inc. All other
+product names, company names, marks, logos and symbols may be trademarks
+of their respective owners.
+
+Copyright (c) 2009, Orbitz World Wide
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice,
+this list of conditions and the following disclaimer.
+* Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+* Neither the name of the Orbitz World Wide nor the names of its contributors
+may be used to endorse or promote products derived from this software
+without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index 9fdded2..4564881 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -25,4 +25,8 @@ Copyright (c) 2008, Google Inc.
 This product includes/uses SQL BNF Grammer (http://www.antlr3.org/grammar/1304304798093/SQL2003_Grammar.zip)
 Copyright (c) 2011, Mage Systems
 Portions Copyright (c)  Jonathan Leffler 2004-2009'
-Portions Copyright (c) 2003, ISO - International Organization for Standardization
\ No newline at end of file
+Portions Copyright (c) 2003, ISO - International Organization for Standardization
+
+This product includes/uses Protobuf-java-format (http://code.google.com/p/protobuf-java-format/)
+Copyright 2000-2011 NeuStar, Inc. All rights reserved.
+Copyright (c) 2009, Orbitz World Wide All rights reserved.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 166a015..fb896f1 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -151,11 +151,23 @@ public class CatalogUtil {
     return revisedSchema.build();
   }
 
-  public static DataType newDataTypeWithoutLen(Type type) {
+  public static DataType newDataType(Type type, String code) {
+    return newDataType(type, code, 0);
+  }
+
+  public static DataType newDataType(Type type, String code, int len) {
+    DataType.Builder builder = DataType.newBuilder();
+    builder.setType(type)
+        .setCode(code)
+        .setLength(len);
+    return builder.build();
+  }
+
+  public static DataType newSimpleDataType(Type type) {
     return DataType.newBuilder().setType(type).build();
   }
 
-  public static DataType [] newDataTypesWithoutLen(Type... types) {
+  public static DataType [] newSimpleDataTypeArray(Type... types) {
     DataType [] dataTypes = new DataType[types.length];
     for (int i = 0; i < types.length; i++) {
       dataTypes[i] = DataType.newBuilder().setType(types[i]).build();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Column.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Column.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Column.java
index 40f1dab..436560e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Column.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Column.java
@@ -49,7 +49,7 @@ public class Column implements ProtoObject<ColumnProto>, Cloneable, GsonObject {
 	}
 
   public Column(String columnName, TajoDataTypes.Type type) {
-    this(columnName, CatalogUtil.newDataTypeWithoutLen(type));
+    this(columnName, CatalogUtil.newSimpleDataType(type));
   }
 
   public Column(String columnName, TajoDataTypes.Type type, int typeLength) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
index 73f861a..10f05dd 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
@@ -37,14 +37,14 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
   @Expose private String signature;
   @Expose private Class<? extends Function> funcClass;
   @Expose private FunctionType funcType;
-  @Expose private DataType [] returnType;
+  @Expose private DataType returnType;
   @Expose private DataType [] params;
 
   public FunctionDesc() {
   }
 
   public FunctionDesc(String signature, Class<? extends Function> clazz,
-      FunctionType funcType, DataType [] retType, DataType [] params) {
+      FunctionType funcType, DataType retType, DataType [] params) {
     this.signature = signature.toLowerCase();
     this.funcClass = clazz;
     this.funcType = funcType;
@@ -54,12 +54,12 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
   
   public FunctionDesc(FunctionDescProto proto) throws ClassNotFoundException {
     this(proto.getSignature(), proto.getClassName(), proto.getType(),
-        newNoNameSchema(proto.getReturnType()),
+        proto.getReturnType(),
         proto.getParameterTypesList().toArray(new DataType[proto.getParameterTypesCount()]));
   }
 
   public FunctionDesc(String signature, String className, FunctionType type,
-                      DataType [] retType, DataType... argTypes) throws ClassNotFoundException {
+                      DataType retType, DataType... argTypes) throws ClassNotFoundException {
     this(signature, (Class<? extends Function>) Class.forName(className), type,
         retType, argTypes);
   }
@@ -95,7 +95,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
     return this.params;
   }
 
-  public DataType [] getReturnType() {
+  public DataType getReturnType() {
     return this.returnType;
   }
 
@@ -135,7 +135,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
     builder.setSignature(this.signature);
     builder.setClassName(this.funcClass.getName());
     builder.setType(this.funcType);
-    builder.setReturnType(this.returnType[0]);
+    builder.setReturnType(this.returnType);
 
     if (this.params != null) { // repeated field
       builder.addAllParameterTypes(Arrays.asList(params));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 05ef6df..7c0de81 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -180,7 +180,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
     if (type == Type.CHAR) {
       return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));
     }
-    return addColumn(name, CatalogUtil.newDataTypeWithoutLen(type));
+    return addColumn(name, CatalogUtil.newSimpleDataType(type));
   }
 
   public synchronized Schema addColumn(String name, Type type, int length) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
index 470e1a5..6f2fdf6 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
@@ -66,26 +66,24 @@ public class StatisticsUtil {
 
     for (TableStat ts : tableStats) {
       // if there is empty stats
-      if (ts.getColumnStats().size() == 0) {
-        continue;
-      }
-
-      // aggregate column stats for each table
-      for (int i = 0; i < ts.getColumnStats().size(); i++) {
-        ColumnStat cs = ts.getColumnStats().get(i);
-        if (cs == null) {
-          LOG.warn("ERROR: One of column stats is NULL (expected column: " + css[i].getColumn() + ")");
-          continue;
-        }
-        css[i].setNumDistVals(css[i].getNumDistValues() + cs.getNumDistValues());
-        css[i].setNumNulls(css[i].getNumNulls() + cs.getNumNulls());
-        if (!cs.minIsNotSet() && (css[i].minIsNotSet() ||
-            css[i].getMinValue().compareTo(cs.getMinValue()) > 0)) {
-          css[i].setMinValue(cs.getMinValue());
-        }
-        if (!cs.maxIsNotSet() && (css[i].maxIsNotSet() ||
-            css[i].getMaxValue().compareTo(cs.getMaxValue()) < 0)) {
-          css[i].setMaxValue(ts.getColumnStats().get(i).getMaxValue());
+      if (ts.getColumnStats().size() > 0) {
+        // aggregate column stats for each table
+        for (int i = 0; i < ts.getColumnStats().size(); i++) {
+          ColumnStat cs = ts.getColumnStats().get(i);
+          if (cs == null) {
+            LOG.warn("ERROR: One of column stats is NULL (expected column: " + css[i].getColumn() + ")");
+            continue;
+          }
+          css[i].setNumDistVals(css[i].getNumDistValues() + cs.getNumDistValues());
+          css[i].setNumNulls(css[i].getNumNulls() + cs.getNumNulls());
+          if (!cs.minIsNotSet() && (css[i].minIsNotSet() ||
+              css[i].getMinValue().compareTo(cs.getMinValue()) > 0)) {
+            css[i].setMinValue(cs.getMinValue());
+          }
+          if (!cs.maxIsNotSet() && (css[i].maxIsNotSet() ||
+              css[i].getMaxValue().compareTo(cs.getMaxValue()) < 0)) {
+            css[i].setMaxValue(ts.getColumnStats().get(i).getMaxValue());
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStat.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStat.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStat.java
index 950e9b5..e0afdff 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStat.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStat.java
@@ -24,6 +24,7 @@ package org.apache.tajo.catalog.statistics;
 import com.google.common.base.Objects;
 import com.google.gson.Gson;
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnStatProto;
@@ -69,6 +70,9 @@ public class TableStat implements ProtoObject<TableStatProto>, Cloneable, GsonOb
 
     this.columnStats = TUtil.newList();
     for (ColumnStatProto colProto : proto.getColStatList()) {
+      if (colProto.getColumn().getDataType().getType() == TajoDataTypes.Type.PROTOBUF) {
+        continue;
+      }
       columnStats.add(new ColumnStat(colProto));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TupleUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TupleUtil.java
index 5ed89b6..556b324 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TupleUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TupleUtil.java
@@ -18,9 +18,12 @@
 
 package org.apache.tajo.catalog.statistics;
 
+import com.google.protobuf.Message;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.*;
 
+import java.io.IOException;
+
 public class TupleUtil {
   public static Datum createFromBytes(DataType type, byte [] bytes) {
     switch (type.getType()) {
@@ -44,6 +47,16 @@ public class TupleUtil {
         return new TextDatum(bytes);
       case INET4:
         return new Inet4Datum(bytes);
+      case PROTOBUF:
+        ProtobufDatumFactory factory = ProtobufDatumFactory.get(type);
+        Message.Builder builder = factory.newBuilder();
+        try {
+          builder.mergeFrom(bytes);
+          return factory.createDatum(builder.build());
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
       default: throw new UnsupportedOperationException(type + " is not supported yet");
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
index d68eb50..bda98db 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
@@ -27,7 +27,7 @@ public class TestCatalogUtil {
   @Test
   public final void testGetCanonicalName() {
     String canonical = CatalogUtil.getCanonicalName("sum",
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4,  Type.INT8));
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8));
     assertEquals("sum(INT4,INT8)", canonical);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestColumn.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestColumn.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestColumn.java
index 406149e..b0b5051 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestColumn.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestColumn.java
@@ -33,9 +33,9 @@ public class TestColumn {
 	static final String FieldName2="f2";
 	static final String FieldName3="f3";	
 	
-	static final DataType Type1 = CatalogUtil.newDataTypeWithoutLen(Type.BLOB);
-	static final DataType Type2 = CatalogUtil.newDataTypeWithoutLen(Type.INT4);
-	static final DataType Type3 = CatalogUtil.newDataTypeWithoutLen(Type.INT8);
+	static final DataType Type1 = CatalogUtil.newSimpleDataType(Type.BLOB);
+	static final DataType Type2 = CatalogUtil.newSimpleDataType(Type.INT4);
+	static final DataType Type3 = CatalogUtil.newSimpleDataType(Type.INT8);
 	
 	Column field1;
 	Column field2;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
index 8359453..b30cbba 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
@@ -54,13 +54,13 @@ public class TestFunctionDesc {
   @Test
   public void testGetSignature() throws IOException, ClassNotFoundException {
     FunctionDesc desc = new FunctionDesc("sum", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.INT8));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8));
     assertEquals("sum", desc.getSignature());
     assertEquals(TestSum.class, desc.getFuncClass());
     assertEquals(FunctionType.GENERAL, desc.getFuncType());
-    assertEquals(Type.INT4, desc.getReturnType()[0].getType());
-    assertArrayEquals(CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.INT8),
+    assertEquals(Type.INT4, desc.getReturnType().getType());
+    assertArrayEquals(CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8),
         desc.getParamTypes());
 
     CommonTestingUtil.getTestDir(TEST_PATH);
@@ -74,8 +74,8 @@ public class TestFunctionDesc {
     assertEquals("sum", newDesc.getSignature());
     assertEquals(TestSum.class, newDesc.getFuncClass());
     assertEquals(FunctionType.GENERAL, newDesc.getFuncType());
-    assertEquals(Type.INT4, newDesc.getReturnType()[0].getType());
-    assertArrayEquals(CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.INT8),
+    assertEquals(Type.INT4, newDesc.getReturnType().getType());
+    assertArrayEquals(CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8),
         newDesc.getParamTypes());
 
     assertEquals(desc.getProto(), newDesc.getProto());
@@ -84,8 +84,8 @@ public class TestFunctionDesc {
   @Test
   public void testToJson() throws InternalException {
 	  FunctionDesc desc = new FunctionDesc("sum", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.INT8));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8));
 	  String json = desc.toJson();
 	  FunctionDesc fromJson = CatalogGsonHelper.fromJson(json, FunctionDesc.class);
 	  assertEquals(desc, fromJson);
@@ -95,8 +95,8 @@ public class TestFunctionDesc {
   @Test
   public void testGetProto() throws InternalException, ClassNotFoundException {
     FunctionDesc desc = new FunctionDesc("sum", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.INT8));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8));
     FunctionDescProto proto = desc.getProto();
     FunctionDesc fromProto = new FunctionDesc(proto);
     assertEquals(desc, fromProto);
@@ -106,8 +106,8 @@ public class TestFunctionDesc {
   @Test
   public void testClone() throws CloneNotSupportedException {
     FunctionDesc desc = new FunctionDesc("sum", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.INT8));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8));
     FunctionDesc cloned = (FunctionDesc)desc.clone();
     assertTrue("reference chk" , !(desc == cloned));
     assertTrue("getClass() chk", desc.getClass() == cloned.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
index 64eccb3..a31a1d8 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
@@ -985,7 +985,7 @@ public class DBStore implements CatalogStore {
   private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
     ColumnProto.Builder builder = ColumnProto.newBuilder();
     builder.setColumnName(res.getString("column_name"));
-    builder.setDataType(CatalogUtil.newDataTypeWithoutLen(
+    builder.setDataType(CatalogUtil.newSimpleDataType(
         getDataType(res.getString("data_type").trim())));
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 6c42439..fd1893b 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -166,12 +166,12 @@ public class TestCatalog {
 	public final void testRegisterFunc() throws Exception { 
 		assertFalse(catalog.containFunction("test2", FunctionType.UDF));
 		FunctionDesc meta = new FunctionDesc("test2", TestFunc1.class, FunctionType.UDF,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4));
 
     catalog.createFunction(meta);
-		assertTrue(catalog.containFunction("test2", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-		FunctionDesc retrived = catalog.getFunction("test2", CatalogUtil.newDataTypesWithoutLen(Type.INT4));
+		assertTrue(catalog.containFunction("test2", CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
+		FunctionDesc retrived = catalog.getFunction("test2", CatalogUtil.newSimpleDataTypeArray(Type.INT4));
 
 		assertEquals(retrived.getSignature(),"test2");
 		assertEquals(retrived.getFuncClass(),TestFunc1.class);
@@ -180,20 +180,20 @@ public class TestCatalog {
 
   @Test
   public final void testDropFunction() throws Exception {
-    assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+    assertFalse(catalog.containFunction("test3", CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
     FunctionDesc meta = new FunctionDesc("test3", TestFunc1.class, FunctionType.UDF,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4 ));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4));
     catalog.createFunction(meta);
-    assertTrue(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+    assertTrue(catalog.containFunction("test3", CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
     catalog.dropFunction("test3");
-    assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+    assertFalse(catalog.containFunction("test3", CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
 
-    assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
+    assertFalse(catalog.containFunction("test3", CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.BLOB)));
     FunctionDesc overload = new FunctionDesc("test3", TestFunc2.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.BLOB));
     catalog.createFunction(overload);
-    assertTrue(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
+    assertTrue(catalog.containFunction("test3", CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.BLOB)));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-common/src/main/java/org/apache/tajo/datum/ArrayDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/ArrayDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/ArrayDatum.java
deleted file mode 100644
index 37c6dfa..0000000
--- a/tajo-common/src/main/java/org/apache/tajo/datum/ArrayDatum.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.datum;
-
-import com.google.gson.annotations.Expose;
-
-import static org.apache.tajo.common.TajoDataTypes.Type;
-
-public class ArrayDatum extends Datum {
-  @Expose private Datum [] data;
-  public ArrayDatum(Datum [] data) {
-    super(Type.ARRAY);
-    this.data = data;
-  }
-
-  public ArrayDatum(int size) {
-    super(Type.ARRAY);
-    this.data = new Datum[size];
-  }
-
-  public Datum get(int idx) {
-    return data[idx];
-  }
-
-  public Datum [] toArray() {
-    return data;
-  }
-
-  public void put(int idx, Datum datum) {
-    data[idx] = datum;
-  }
-
-  @Override
-  public int size() {
-    return 0;
-  }
-
-  @Override
-  public int compareTo(Datum datum) {
-    return 0; // TODO - to be implemented
-  }
-
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("[");
-    boolean first = true;
-    for (Datum field : data) {
-      if (first) {
-        first = false;
-      } else {
-        sb.append(",");
-      }
-      sb.append(field.asChars());
-    }
-    sb.append("]");
-
-    return sb.toString();
-  }
-
-  public boolean equals(Object obj) {
-    if (obj instanceof ArrayDatum) {
-      ArrayDatum other = (ArrayDatum) obj;
-      if (data.length != other.data.length) {
-        return false;
-      }
-
-
-      for (int i = 0; i < data.length; i++) {
-        if (!data[i].equals(other.data[i])) {
-          return false;
-        }
-      }
-
-      return true;
-    }
-
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
index cc0089c..e783666 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
@@ -54,8 +54,6 @@ public class DatumFactory {
         return Inet4Datum.class;
       case ANY:
         return NullDatum.class;
-      case ARRAY:
-        return ArrayDatum.class;
       case NULL:
         return NullDatum.class;
       default:
@@ -110,19 +108,10 @@ public class DatumFactory {
   public static BooleanDatum createBool(boolean val) {
     return new BooleanDatum(val);
   }
-  /*
-  public static BoolDatum createBool(int val) {
-    return new BoolDatum(val);
-  }
-  */
   
 	public static BitDatum createBit(byte val) {
 		return new BitDatum(val);
 	}
-  /*
-  public static ByteDatum createBit(int val) {
-    return new ByteDatum(val);
-  }*/
 
   public static CharDatum createChar(char val) {
     return new CharDatum(val);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
index 7e8bd27..985ecd6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
@@ -27,10 +27,6 @@ import org.apache.tajo.util.NumberUtil;
 import java.nio.ByteBuffer;
 
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-
 public class Int8Datum extends Datum implements NumericDatum {
   private static final int size = 8;
   @Expose private long val;
@@ -81,7 +77,7 @@ public class Int8Datum extends Datum implements NumericDatum {
 	}
 
   @Override
-	public byte[] asByteArray() {
+	public byte [] asByteArray() {
 		ByteBuffer bb = ByteBuffer.allocate(8);
 		bb.putLong(val);
 		return bb.array();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java
new file mode 100644
index 0000000..46df4f0
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.datum;
+
+import com.google.protobuf.Message;
+import org.apache.tajo.common.TajoDataTypes;
+
+public class ProtobufDatum extends Datum {
+  private Message value;
+
+  public ProtobufDatum() {
+    super(TajoDataTypes.Type.PROTOBUF);
+  }
+
+  public ProtobufDatum(Message message) {
+    super(TajoDataTypes.Type.PROTOBUF);
+    this.value = message;
+  }
+
+  public void set(Message message) {
+    this.value = message;
+  }
+
+  public Message get() {
+    return value;
+  }
+
+  @Override
+  public byte [] asByteArray() {
+    return value.toByteArray();
+  }
+
+  @Override
+  public int size() {
+    return value.getSerializedSize();
+  }
+
+  @Override
+  public int compareTo(Datum datum) {
+    if (datum.type() == TajoDataTypes.Type.PROTOBUF) {
+      return value.equals(((ProtobufDatum)datum).get()) ? 0 : -1;
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return value.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object object) {
+    if (object instanceof ProtobufDatum) {
+      ProtobufDatum another = (ProtobufDatum) object;
+      return value.equals(another.value);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return value.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatumFactory.java
new file mode 100644
index 0000000..0d585a4
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatumFactory.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.datum;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class ProtobufDatumFactory {
+  private static Map<String, ProtobufDatumFactory> factories = Maps.newHashMap();
+  private static ProtobufJsonFormat protobufFormatter = ProtobufJsonFormat.getInstance();
+  private Method getDefaultInstance;
+  Class<? extends GeneratedMessage> messageClass;
+  private Message defaultInstance;
+
+  private ProtobufDatumFactory(String protobufClass) {
+    try {
+      String messageClassName = protobufClass;
+      this.messageClass = (Class<? extends GeneratedMessage>) Class.forName(messageClassName);
+      this.getDefaultInstance = messageClass.getMethod("getDefaultInstance");
+      defaultInstance = (Message) getDefaultInstance.invoke(null);
+    } catch (Throwable t) {
+      t.printStackTrace();
+      throw new RuntimeException(t);
+    }
+  }
+
+  public <T extends Message.Builder> T newBuilder() {
+    Message.Builder builder;
+    try {
+      builder = defaultInstance.newBuilderForType();
+    } catch (Throwable t) {
+      t.printStackTrace();
+      throw new RuntimeException(t);
+    }
+    return (T) builder;
+  }
+
+  public ProtobufDatum createDatum(Message.Builder builder) {
+    return createDatum(builder.build());
+  }
+
+  public ProtobufDatum createDatum(Message message) {
+    return new ProtobufDatum(message);
+  }
+
+  public static ProtobufDatumFactory get(DataType dataType) {
+    Preconditions.checkArgument(dataType.getType() == TajoDataTypes.Type.PROTOBUF,
+        "ProtobufDatumFactory only can accepts Protocol Buffer Datum Type.");
+    return get(dataType.getCode());
+  }
+
+  public static ProtobufDatumFactory get(String className) {
+    ProtobufDatumFactory factory;
+    if (factories.containsKey(className)) {
+      factory = factories.get(className);
+    } else {
+      factory = new ProtobufDatumFactory(className);
+      factories.put(className, factory);
+    }
+    return factory;
+  }
+
+  public static String toJson(Message message) {
+    return protobufFormatter.printToString(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/AbstractCharBasedFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/AbstractCharBasedFormatter.java b/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/AbstractCharBasedFormatter.java
new file mode 100644
index 0000000..186014f
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/AbstractCharBasedFormatter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.datum.protobuf;
+
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.UnknownFieldSet;
+
+import java.io.*;
+import java.nio.charset.Charset;
+
+public abstract class AbstractCharBasedFormatter extends ProtobufFormatter {
+
+	@Override
+	public void print(Message message, OutputStream output, Charset cs)
+			throws IOException {
+		OutputStreamWriter writer = new OutputStreamWriter(output, cs);
+		print(message, writer);
+		writer.flush();
+	}
+	
+	abstract public void print(Message message, Appendable output) throws IOException;
+
+
+	@Override
+	public void print(UnknownFieldSet fields, OutputStream output, Charset cs)
+			throws IOException {
+		OutputStreamWriter writer = new OutputStreamWriter(output, cs);
+		print(fields, writer);
+		writer.flush();
+	}
+	
+	abstract public void print(UnknownFieldSet fields, Appendable output) throws IOException;
+
+	@Override
+	public void merge(InputStream input, Charset cs, 
+			ExtensionRegistry extensionRegistry, Builder builder) throws IOException {
+		InputStreamReader reader = new InputStreamReader(input, cs);
+		merge(reader, extensionRegistry, builder);
+	}
+	
+	
+	abstract public void merge(CharSequence input, ExtensionRegistry extensionRegistry,
+            Message.Builder builder) throws IOException;
+
+	/**
+     * Parse a text-format message from {@code input} and merge the contents into {@code builder}.
+     * Extensions will be recognized if they are registered in {@code extensionRegistry}.
+     */
+    public void merge(Readable input,
+    		ExtensionRegistry extensionRegistry,
+    		Message.Builder builder) throws IOException {
+        // Read the entire input to a String then parse that.
+
+        // If StreamTokenizer were not quite so crippled, or if there were a kind
+        // of Reader that could read in chunks that match some particular regex,
+        // or if we wanted to write a custom Reader to tokenize our stream, then
+        // we would not have to read to one big String. Alas, none of these is
+        // the case. Oh well.
+
+		merge(TextUtils.toStringBuilder(input), extensionRegistry, builder);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/ProtobufFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/ProtobufFormatter.java b/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/ProtobufFormatter.java
new file mode 100644
index 0000000..b39dc37
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/protobuf/ProtobufFormatter.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.datum.protobuf;
+
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+
+import java.io.*;
+import java.nio.charset.Charset;
+
+public abstract class ProtobufFormatter {
+    private Charset defaultCharset = Charset.defaultCharset();
+
+    /**
+     * Set the default character set to use for input / output data streams
+     * @param cs the character set to use by default, when unspecified
+     */
+    public void setDefaultCharset(Charset cs) {
+        defaultCharset = cs;
+    }
+    
+    /**
+     * Get the default character set to use for input / output streams
+     * @return the character set to use by default, when unspecified
+     */
+    public Charset getDefaultCharset() {
+        return defaultCharset;
+    }
+	
+	/**
+	 * @see print( com.google.protobuf.Message , java.io.OutputStream, java.nio.charset.Charset)
+	 * @param message the protobuf message to format
+	 * @param output the stream to write the formatted message using the default charset
+	 * @throws java.io.IOException
+	 */
+	public void print(final Message message, OutputStream output) throws IOException {
+		print(message, output, defaultCharset);
+	}
+
+	/**
+	 * Outputs a textual representation of the Protocol Message supplied into
+	 * the parameter output. (This representation is the new version of the
+	 * classic "ProtocolPrinter" output from the original Protocol Buffer system)
+	 *
+	 * @param message the protobuf message to format
+	 * @param output the stream to write the formatted message
+	 * @param cs the character set to use
+	 * @throws java.io.IOException
+	 */
+	abstract public void print(final Message message, OutputStream output, Charset cs) throws IOException;
+
+
+	/**
+	 * @see print( com.google.protobuf.UnknownFieldSet , java.io.OutputStream, java.nio.charset.Charset)
+	 * @param fields unknown fields to format
+	 * @param output output the stream to write the formatted message using the default charset
+	 * @throws java.io.IOException
+	 */
+	public void print(final UnknownFieldSet fields, OutputStream output) throws IOException {
+		print(fields, output, defaultCharset);
+	}
+
+	/**
+	 * @param fields unknown fields to format
+	 * @param output output the stream to write the formatted message
+	 * @param cs the character set to use
+	 * @throws java.io.IOException
+	 */
+	abstract public void print(final UnknownFieldSet fields, OutputStream output, Charset cs) throws IOException;
+	
+
+	/**
+     * Like {@code print()}, but writes directly to a {@code String} and returns it.
+     */
+	public String printToString(final Message message) {
+		try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            print(message, out, defaultCharset);
+            out.flush();
+            return out.toString();
+        } catch (IOException e) {
+            throw new RuntimeException("Writing to a StringBuilder threw an IOException (should never happen).",
+                                       e);
+        }
+	}
+	
+	/**
+     * Like {@code print()}, but writes directly to a {@code String} and returns it.
+     */
+	public String printToString(final UnknownFieldSet fields) {
+		try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            print(fields, out, defaultCharset);
+            out.flush();
+            return out.toString();
+        } catch (IOException e) {
+            throw new RuntimeException("Writing to a StringBuilder threw an IOException (should never happen).",
+                                       e);
+        }
+	}
+	
+	/**
+     * Thrown when parsing an invalid text format message.
+     */
+    public static class ParseException extends IOException {
+    	private static final long serialVersionUID = 1L;
+
+		public ParseException(String message) {
+            super(message);
+        }
+    }
+	
+	
+	/**
+	 * Parse a text-format message from {@code input} and merge the contents
+	 * into {@code builder}.
+	 */
+	abstract public void merge(final InputStream input, Charset cs,
+			ExtensionRegistry extensionRegistry,
+			final Message.Builder builder) throws IOException;
+
+	
+	/**
+	 * Parse a text-format message from {@code input} and merge the contents
+	 * into {@code builder}.
+	 */
+	public void merge(final InputStream input, Charset cs, 
+			final Message.Builder builder) throws IOException {
+		
+		merge(input, cs, ExtensionRegistry.getEmptyRegistry(), builder);
+	}
+	
+	public void merge(final InputStream input, 
+			final Message.Builder builder) throws IOException {
+		
+		merge(input, defaultCharset, 
+				ExtensionRegistry.getEmptyRegistry(), builder);
+	}
+
+  public void merge(final byte [] input, final Message.Builder builder) throws IOException {
+    ByteArrayInputStream bio = new ByteArrayInputStream(input);
+    merge(bio, defaultCharset, ExtensionRegistry.getEmptyRegistry(), builder);
+    bio.close();
+  }
+	
+	public void merge(final InputStream input,
+			ExtensionRegistry extensionRegistry,
+			final Message.Builder builder) throws IOException {
+		merge(input, defaultCharset, extensionRegistry, builder);
+	}
+}