You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/04 05:07:33 UTC

[2/4] TAJO-224: Rearrange DataType enumeration and Refactor type systems. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index d8a0a82..12b3709 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -25,7 +25,6 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.exception.InternalException;
 
 import java.util.*;
@@ -131,13 +130,13 @@ public class EvalTreeUtil {
     for (Target target : targets) {
       schema.addColumn(
           target.hasAlias() ? target.getAlias() : target.getEvalTree().getName(),
-          getDomainByExpr(inputSchema, target.getEvalTree())[0]);
+          getDomainByExpr(inputSchema, target.getEvalTree()));
     }
     
     return schema;
   }
   
-  public static DataType[] getDomainByExpr(Schema inputSchema, EvalNode expr)
+  public static DataType getDomainByExpr(Schema inputSchema, EvalNode expr)
       throws InternalException {
     switch (expr.getType()) {
     case AND:      
@@ -158,7 +157,7 @@ public class EvalTreeUtil {
 
     case FIELD:
       FieldEval fieldEval = (FieldEval) expr;
-      return SchemaUtil.newNoNameSchema(inputSchema.getColumnByFQN(fieldEval.getName()).getDataType());
+      return inputSchema.getColumnByFQN(fieldEval.getName()).getDataType();
 
       
     default:

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
index 87d6077..8c24104 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
@@ -23,7 +23,6 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.storage.Tuple;
 
 public class FieldEval extends EvalNode implements Cloneable {
@@ -77,8 +76,8 @@ public class FieldEval extends EvalNode implements Cloneable {
   }
 
   @Override
-	public DataType[] getValueType() {
-		return SchemaUtil.newNoNameSchema(column.getDataType());
+	public DataType getValueType() {
+		return column.getDataType();
 	}
 	
   public Column getColumnRef() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
index eea2b9c..df26692 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
@@ -49,7 +49,7 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
     ParamType [] paramTypes = new ParamType[argEvals.length];
     for (int i = 0; i < argEvals.length; i++) {
       if (argEvals[i].getType() == EvalType.CONST) {
-        if (argEvals[i].getValueType()[0].getType() == TajoDataTypes.Type.NULL) {
+        if (argEvals[i].getValueType().getType() == TajoDataTypes.Type.NULL) {
           paramTypes[i] = ParamType.NULL;
         } else {
           paramTypes[i] = ParamType.CONSTANT;
@@ -79,7 +79,7 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
     this.argEvals = args;
   }
 	
-	public DataType [] getValueType() {
+	public DataType getValueType() {
 		return this.funcDesc.getReturnType();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
index 60dea3b..e2411e3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
@@ -29,8 +29,7 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.Tuple;
 
 public class InEval extends BinaryEval {
-  private static final TajoDataTypes.DataType[] RES_TYPE
-      = CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.BOOLEAN);
+  private static final TajoDataTypes.DataType RES_TYPE = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.BOOLEAN);
 
   @Expose private boolean not;
   private Integer fieldId = null;
@@ -51,7 +50,7 @@ public class InEval extends BinaryEval {
   }
 
   @Override
-  public TajoDataTypes.DataType[] getValueType() {
+  public TajoDataTypes.DataType getValueType() {
     return RES_TYPE;
   }
 
@@ -100,7 +99,7 @@ public class InEval extends BinaryEval {
   }
 
   public String toString() {
-    return leftExpr + " IN " + rightExpr;
+    return leftExpr + " IN (" + rightExpr + ")";
   }
 
   private class InEvalCtx implements EvalContext {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
index 1064974..0a176fb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
@@ -33,7 +33,7 @@ import org.apache.tajo.storage.Tuple;
 public class IsNullEval extends BinaryEval {
   // it's just a hack to emulate a binary expression
   private final static ConstEval DUMMY_EVAL = new ConstEval(DatumFactory.createBool(true));
-  private static final DataType [] RES_TYPE = CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.BOOLEAN);
+  private static final DataType RES_TYPE = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.BOOLEAN);
 
   // persistent variables
   @Expose private boolean isNot;
@@ -52,7 +52,7 @@ public class IsNullEval extends BinaryEval {
   }
 
   @Override
-  public DataType[] getValueType() {
+  public DataType getValueType() {
     return RES_TYPE;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
index 9fe4679..fb96c47 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
@@ -30,8 +30,7 @@ import org.apache.tajo.storage.Tuple;
 
 public class NotEval extends EvalNode implements Cloneable {
   @Expose private EvalNode subExpr;
-  private static final DataType[] RES_TYPE =
-      CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.BOOLEAN);
+  private static final DataType RES_TYPE = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.BOOLEAN);
 
   public NotEval(EvalNode subExpr) {
     super(EvalType.NOT);
@@ -51,7 +50,7 @@ public class NotEval extends EvalNode implements Cloneable {
   }
 
   @Override
-  public DataType [] getValueType() {
+  public DataType getValueType() {
     return RES_TYPE;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
index ab558d9..73a68cd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
@@ -39,7 +39,7 @@ public class PartialBinaryExpr extends EvalNode {
   }
 
   @Override
-  public DataType[] getValueType() {
+  public DataType getValueType() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
index 83ee1e7..64a655c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
@@ -33,7 +33,7 @@ import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
 public abstract class PatternMatchPredicateEval extends BinaryEval {
-  private static final DataType [] RES_TYPE = CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.BOOLEAN);
+  private static final DataType RES_TYPE = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.BOOLEAN);
 
   @Expose protected boolean not;
   @Expose protected String pattern;
@@ -60,7 +60,7 @@ public abstract class PatternMatchPredicateEval extends BinaryEval {
   abstract void compile(String pattern) throws PatternSyntaxException;
 
   @Override
-  public DataType [] getValueType() {
+  public DataType getValueType() {
     return RES_TYPE;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
index ecda8ae..1fc6a21 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
@@ -20,17 +20,17 @@ package org.apache.tajo.engine.eval;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.ArrayDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.util.TUtil;
 
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
 public class RowConstantEval extends EvalNode {
-  @Expose ArrayDatum values;
+  @Expose Datum [] values;
 
-  public RowConstantEval(Datum[] values) {
+  public RowConstantEval(Datum [] values) {
     super(EvalType.ROW_CONSTANT);
-    this.values = new ArrayDatum(values);
+    this.values = values;
   }
 
   @Override
@@ -39,8 +39,8 @@ public class RowConstantEval extends EvalNode {
   }
 
   @Override
-  public TajoDataTypes.DataType[] getValueType() {
-    return new TajoDataTypes.DataType[] {CatalogUtil.newDataTypeWithoutLen(values.get(0).type())};
+  public DataType getValueType() {
+    return CatalogUtil.newSimpleDataType(values[0].type());
   }
 
   @Override
@@ -50,7 +50,7 @@ public class RowConstantEval extends EvalNode {
 
   @Override
   public Datum terminate(EvalContext ctx) {
-    return values;
+    return null;
   }
 
   @Override
@@ -64,19 +64,11 @@ public class RowConstantEval extends EvalNode {
   }
 
   public String toString() {
-    StringBuilder sb = new StringBuilder("(");
-    for (int i = 0; i < values.toArray().length; i++) {
-      if (i != 0) {
-        sb.append(",");
-      }
-      sb.append(values.get(i).toString());
-    }
-    sb.append(")");
-    return sb.toString();
+    return TUtil.arrayToString(values);
   }
 
   public Datum [] getValues() {
-    return values.toArray();
+    return values;
   }
 
   public void preOrder(EvalNodeVisitor visitor) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/AggFunction.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/AggFunction.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/AggFunction.java
index a79df8e..ebbf6e6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/AggFunction.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/AggFunction.java
@@ -41,7 +41,7 @@ public abstract class AggFunction<T extends Datum> extends Function<T> {
 
   public abstract Datum getPartialResult(FunctionContext ctx);
 
-  public abstract DataType [] getPartialResultType();
+  public abstract DataType getPartialResultType();
 
   public abstract T terminate(FunctionContext ctx);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
index 5441726..5bae257 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
@@ -20,15 +20,17 @@ package org.apache.tajo.engine.function.builtin;
 
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.function.AggFunction;
-import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.ArrayDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 
+import static org.apache.tajo.InternalTypes.AvgDoubleProto;
+
 public class AvgDouble extends AggFunction {
   public AvgDouble() {
     super(new Column[] {
@@ -53,24 +55,24 @@ public class AvgDouble extends AggFunction {
   @Override
   public void merge(FunctionContext ctx, Tuple part) {
     AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum array = (ArrayDatum) part.get(0);
-    avgCtx.sum += array.get(0).asFloat8();
-    avgCtx.count += array.get(1).asInt8();
+    ProtobufDatum datum = (ProtobufDatum) part.get(0);
+    AvgDoubleProto proto = (AvgDoubleProto) datum.get();
+    avgCtx.sum += proto.getSum();
+    avgCtx.count += proto.getCount();
   }
 
   @Override
   public Datum getPartialResult(FunctionContext ctx) {
     AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum part = new ArrayDatum(2);
-    part.put(0, DatumFactory.createFloat8(avgCtx.sum));
-    part.put(1, DatumFactory.createInt8(avgCtx.count));
-
-    return part;
+    AvgDoubleProto.Builder builder = AvgDoubleProto.newBuilder();
+    builder.setSum(avgCtx.sum);
+    builder.setCount(avgCtx.count);
+    return new ProtobufDatum(builder.build());
   }
 
   @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8, Type.INT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newDataType(Type.PROTOBUF, AvgDoubleProto.class.getName());
   }
 
   @Override
@@ -79,7 +81,7 @@ public class AvgDouble extends AggFunction {
     return DatumFactory.createFloat8(avgCtx.sum / avgCtx.count);
   }
 
-  private class AvgContext implements FunctionContext {
+  protected class AvgContext implements FunctionContext {
     double sum;
     long count;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgFloat.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgFloat.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgFloat.java
index a245eab..9fc5ed9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgFloat.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgFloat.java
@@ -18,28 +18,12 @@
 
 package org.apache.tajo.engine.function.builtin;
 
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.FunctionContext;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.ArrayDatum;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.Float4Datum;
 import org.apache.tajo.storage.Tuple;
 
-public class AvgFloat extends AggFunction<Float4Datum> {
+public class AvgFloat extends AvgDouble {
 
   public AvgFloat() {
-    super(new Column[] {
-        new Column("val", Type.FLOAT8)
-    });
-  }
-
-  public AvgContext newContext() {
-    return new AvgContext();
   }
 
   @Override
@@ -48,38 +32,4 @@ public class AvgFloat extends AggFunction<Float4Datum> {
     avgCtx.sum += params.get(0).asFloat4();
     avgCtx.count++;
   }
-
-  @Override
-  public void merge(FunctionContext ctx, Tuple part) {
-    AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum array = (ArrayDatum) part.get(0);
-    avgCtx.sum += array.get(0).asFloat8();
-    avgCtx.count += array.get(1).asInt8();
-  }
-
-  @Override
-  public Datum getPartialResult(FunctionContext ctx) {
-    AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum part = new ArrayDatum(2);
-    part.put(0, DatumFactory.createFloat8(avgCtx.sum));
-    part.put(1, DatumFactory.createInt8(avgCtx.count));
-
-    return part;
-  }
-
-  @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8, Type.INT8);
-  }
-
-  @Override
-  public Float4Datum terminate(FunctionContext ctx) {
-    AvgContext avgCtx = (AvgContext) ctx;
-    return DatumFactory.createFloat4((float) (avgCtx.sum / avgCtx.count));
-  }
-
-  private class AvgContext implements FunctionContext {
-    double sum;
-    long count;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgInt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgInt.java
index c34c890..338903a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgInt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgInt.java
@@ -18,28 +18,13 @@
 
 package org.apache.tajo.engine.function.builtin;
 
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.FunctionContext;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.ArrayDatum;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.Float4Datum;
 import org.apache.tajo.storage.Tuple;
 
-public class AvgInt extends AggFunction<Float4Datum> {
+public class AvgInt extends AvgLong {
 
   public AvgInt() {
-    super(new Column[] {
-        new Column("val", Type.FLOAT8)
-    });
-  }
-
-  public AvgContext newContext() {
-    return new AvgContext();
+    super();
   }
 
   @Override
@@ -48,38 +33,4 @@ public class AvgInt extends AggFunction<Float4Datum> {
     avgCtx.sum += params.get(0).asInt4();
     avgCtx.count++;
   }
-
-  @Override
-  public void merge(FunctionContext ctx, Tuple part) {
-    AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum array = (ArrayDatum) part.get(0);
-    avgCtx.sum += array.get(0).asInt8();
-    avgCtx.count += array.get(1).asInt8();
-  }
-
-  @Override
-  public Datum getPartialResult(FunctionContext ctx) {
-    AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum part = new ArrayDatum(2);
-    part.put(0, DatumFactory.createInt8(avgCtx.sum));
-    part.put(1, DatumFactory.createInt8(avgCtx.count));
-
-    return part;
-  }
-
-  @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT8, Type.INT8);
-  }
-
-  @Override
-  public Float4Datum terminate(FunctionContext ctx) {
-    AvgContext avgCtx = (AvgContext) ctx;
-    return DatumFactory.createFloat4((float) avgCtx.sum / avgCtx.count);
-  }
-
-  private class AvgContext implements FunctionContext {
-    long sum;
-    long count;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
index 1dc2084..86fc709 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
@@ -20,16 +20,18 @@ package org.apache.tajo.engine.function.builtin;
 
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.function.AggFunction;
-import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.ArrayDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.Float8Datum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 
+import static org.apache.tajo.InternalTypes.AvgLongProto;
+
 public class AvgLong extends AggFunction<Float8Datum> {
 
   public AvgLong() {
@@ -52,24 +54,24 @@ public class AvgLong extends AggFunction<Float8Datum> {
   @Override
   public void merge(FunctionContext ctx, Tuple part) {
     AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum array = (ArrayDatum) part.get(0);
-    avgCtx.sum += array.get(0).asInt8();
-    avgCtx.count += array.get(1).asInt8();
+    ProtobufDatum datum = (ProtobufDatum) part.get(0);
+    AvgLongProto proto = (AvgLongProto) datum.get();
+    avgCtx.sum += proto.getSum();
+    avgCtx.count += proto.getCount();
   }
 
   @Override
   public Datum getPartialResult(FunctionContext ctx) {
     AvgContext avgCtx = (AvgContext) ctx;
-    ArrayDatum part = new ArrayDatum(2);
-    part.put(0, DatumFactory.createInt8(avgCtx.sum));
-    part.put(1, DatumFactory.createInt8(avgCtx.count));
-
-    return part;
+    AvgLongProto.Builder builder = AvgLongProto.newBuilder();
+    builder.setSum(avgCtx.sum);
+    builder.setCount(avgCtx.count);
+    return new ProtobufDatum(builder.build());
   }
 
   @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT8, Type.INT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newDataType(Type.PROTOBUF, AvgLongProto.class.getName());
   }
 
   @Override
@@ -78,7 +80,7 @@ public class AvgLong extends AggFunction<Float8Datum> {
     return DatumFactory.createFloat8((double) avgCtx.sum / avgCtx.count);
   }
 
-  private class AvgContext implements FunctionContext {
+  protected class AvgContext implements FunctionContext {
     long sum;
     long count;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountRows.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountRows.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountRows.java
index 8f97ccc..722d8b7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountRows.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountRows.java
@@ -60,8 +60,8 @@ public class CountRows extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxDouble.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxDouble.java
index 0410150..38fb14d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxDouble.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxDouble.java
@@ -54,8 +54,8 @@ public class MaxDouble extends AggFunction<Float8Datum> {
   }
 
   @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxFloat.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxFloat.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxFloat.java
index d25288d..c3c29f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxFloat.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxFloat.java
@@ -52,8 +52,8 @@ public class MaxFloat extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT4);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxInt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxInt.java
index 75889e7..be18858 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxInt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxInt.java
@@ -53,8 +53,8 @@ public class MaxInt extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT4);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT4);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxLong.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxLong.java
index 24e7b07..7f82f06 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxLong.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MaxLong.java
@@ -53,8 +53,8 @@ public class MaxLong extends AggFunction<Int8Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinDouble.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinDouble.java
index 72af4a3..fa54c8b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinDouble.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinDouble.java
@@ -53,8 +53,8 @@ public class MinDouble extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinFloat.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinFloat.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinFloat.java
index 1937f07..1e9def1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinFloat.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinFloat.java
@@ -54,8 +54,8 @@ public class MinFloat extends AggFunction<Float4Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT4);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinInt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinInt.java
index 13d3745..e4184e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinInt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinInt.java
@@ -53,8 +53,8 @@ public class MinInt extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT4);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT4);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinLong.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinLong.java
index 8641895..6cc854f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinLong.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinLong.java
@@ -54,8 +54,8 @@ public class MinLong extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType[] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinString.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinString.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinString.java
index f771229..5364540 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinString.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/MinString.java
@@ -58,8 +58,8 @@ public class MinString extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.TEXT);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.TEXT);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDouble.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDouble.java
index c10cf1d..09b1ab2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDouble.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDouble.java
@@ -53,8 +53,8 @@ public class SumDouble extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
index 40842e8..96b45ba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
@@ -51,8 +51,8 @@ public class SumFloat extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT4);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumInt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumInt.java
index 19b58b4..5797d6d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumInt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumInt.java
@@ -53,8 +53,8 @@ public class SumInt extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT4);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT4);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLong.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLong.java
index 260566e..fe33b62 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLong.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLong.java
@@ -53,8 +53,8 @@ public class SumLong extends AggFunction<Datum> {
   }
 
   @Override
-  public DataType [] getPartialResultType() {
-    return CatalogUtil.newDataTypesWithoutLen(Type.INT8);
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index eeac939..044c9eb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -1025,9 +1025,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
         givenArgs[0] = createEvalTree(plan, block, params[0]);
         if (setFunction.getSignature().equalsIgnoreCase("count")) {
-          paramTypes[0] = CatalogUtil.newDataTypeWithoutLen(TajoDataTypes.Type.ANY);
+          paramTypes[0] = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.ANY);
         } else {
-          paramTypes[0] = givenArgs[0].getValueType()[0];
+          paramTypes[0] = givenArgs[0].getValueType();
         }
 
         if (!catalog.containFunction(setFunction.getSignature(), functionType, paramTypes)) {
@@ -1055,7 +1055,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
         for (int i = 0; i < params.length; i++) {
             givenArgs[i] = createEvalTree(plan, block, params[i]);
-            paramTypes[i] = givenArgs[i].getValueType()[0];
+            paramTypes[i] = givenArgs[i].getValueType();
         }
 
         if (!catalog.containFunction(function.getSignature(), paramTypes)) {
@@ -1169,7 +1169,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   static Schema getProjectedSchema(LogicalPlan plan, Target[] targets) {
     Schema projected = new Schema();
     for(Target t : targets) {
-      DataType type = t.getEvalTree().getValueType()[0];
+      DataType type = t.getEvalTree().getValueType();
       String name;
       if (t.hasAlias() || t.getEvalTree().getType() == EvalType.FIELD) {
         name = t.getCanonicalName();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 4449993..7bef3b9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -24,12 +24,10 @@ import com.google.common.collect.ObjectArrays;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.query.exception.InvalidQueryException;
@@ -158,7 +156,7 @@ public class PlannerUtil {
             }
 
             secondFunc.setArgs(new EvalNode [] {new FieldEval(
-                new Column(targetName, newTarget.getEvalTree().getValueType()[0]))});
+                new Column(targetName, newTarget.getEvalTree().getValueType()))});
           } else {
             func.setFirstPhase();
             newTarget = new Target(func);
@@ -172,12 +170,8 @@ public class PlannerUtil {
                 break;
               }
             }
-            if (func.getValueType().length > 1) { // hack for partial result
-              secondFunc.setArgs(new EvalNode[] {new FieldEval(new Column(targetName, Type.ARRAY))});
-            } else {
-              secondFunc.setArgs(new EvalNode [] {new FieldEval(
-                  new Column(targetName, newTarget.getEvalTree().getValueType()[0]))});
-            }
+            secondFunc.setArgs(new EvalNode [] {new FieldEval(
+                new Column(targetName, newTarget.getEvalTree().getValueType()))});
           }
           firstStepTargets.add(newTarget);
         }
@@ -564,12 +558,7 @@ public class PlannerUtil {
   public static Schema targetToSchema(Target[] targets) {
     Schema schema = new Schema();
     for(Target t : targets) {
-      DataType type;
-      if (t.getEvalTree().getValueType().length > 1) {
-        type = CatalogUtil.newDataTypeWithoutLen(Type.ARRAY);
-      } else {
-        type = t.getEvalTree().getValueType()[0];
-      }
+      DataType type = t.getEvalTree().getValueType();
       String name;
       if (t.hasAlias()) {
         name = t.getAlias();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
index 7e2bc16..2f32372 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -19,13 +19,11 @@
 package org.apache.tajo.engine.planner;
 
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.util.TUtil;
 
 /**
@@ -38,11 +36,7 @@ public class Target implements Cloneable, GsonObject {
 
   public Target(EvalNode expr) {
     this.expr = expr;
-    if (expr.getType() == EvalType.AGG_FUNCTION && expr.getValueType().length > 1) { // hack for partial result
-      this.column = new Column(expr.getName(), Type.ARRAY);
-    } else {
-      this.column = new Column(expr.getName(), expr.getValueType()[0]);
-    }
+    this.column = new Column(expr.getName(), expr.getValueType());
   }
 
   public Target(final EvalNode eval, final String alias) {
@@ -56,7 +50,7 @@ public class Target implements Cloneable, GsonObject {
 
   public final void setAlias(String alias) {
     this.alias = alias;
-    this.column = new Column(alias, expr.getValueType()[0]);
+    this.column = new Column(alias, expr.getValueType());
   }
 
   public final String getAlias() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
index 205b1b7..bc1ad61 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
@@ -159,7 +159,7 @@ public class TargetListManager {
       t.setAlias(plan.newNonameColumnName(t.getEvalTree().getName()));
       name = t.getCanonicalName();
     }
-    return new Column(name, t.getEvalTree().getValueType()[0]);
+    return new Column(name, t.getEvalTree().getValueType());
   }
 
   public boolean isAllResolved() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 0141379..5a91080 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -219,105 +219,105 @@ public class TajoMaster extends CompositeService {
 
     // Sum
     sqlFuncs.add(new FunctionDesc("sum", SumInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("sum", SumLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("sum", SumFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT4)));
     sqlFuncs.add(new FunctionDesc("sum", SumDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT8)));
 
     // Max
     sqlFuncs.add(new FunctionDesc("max", MaxInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("max", MaxLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("max", MaxFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT4)));
     sqlFuncs.add(new FunctionDesc("max", MaxDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT8)));
 
     // Min
     sqlFuncs.add(new FunctionDesc("min", MinInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("min", MinLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("min", MinFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4 )));
+        CatalogUtil.newSimpleDataType(Type.FLOAT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT4)));
     sqlFuncs.add(new FunctionDesc("min", MinDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT8)));
     sqlFuncs.add(new FunctionDesc("min", MinString.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
+        CatalogUtil.newSimpleDataType(Type.TEXT),
+        CatalogUtil.newSimpleDataTypeArray(Type.TEXT)));
 
     // AVG
     sqlFuncs.add(new FunctionDesc("avg", AvgInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("avg", AvgLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("avg", AvgFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT4)));
     sqlFuncs.add(new FunctionDesc("avg", AvgDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
+        CatalogUtil.newSimpleDataType(Type.FLOAT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.FLOAT8)));
 
     // Count
     sqlFuncs.add(new FunctionDesc("count", CountValue.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.ANY)));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.ANY)));
     sqlFuncs.add(new FunctionDesc("count", CountRows.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen()));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray()));
     sqlFuncs.add(new FunctionDesc("count", CountValueDistinct.class, FunctionType.DISTINCT_AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.ANY)));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.ANY)));
 
     // GeoIP
     sqlFuncs.add(new FunctionDesc("in_country", InCountry.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.BOOLEAN),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT, Type.TEXT)));
+        CatalogUtil.newSimpleDataType(Type.BOOLEAN),
+        CatalogUtil.newSimpleDataTypeArray(Type.TEXT, Type.TEXT)));
     sqlFuncs.add(new FunctionDesc("country", Country.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
+        CatalogUtil.newSimpleDataType(Type.TEXT),
+        CatalogUtil.newSimpleDataTypeArray(Type.TEXT)));
 
     // Date
     sqlFuncs.add(new FunctionDesc("date", Date.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray(Type.TEXT)));
 
     // Today
     sqlFuncs.add(new FunctionDesc("today", Date.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen()));
+        CatalogUtil.newSimpleDataType(Type.INT8),
+        CatalogUtil.newSimpleDataTypeArray()));
 
     sqlFuncs.add(
         new FunctionDesc("random", RandomInt.class, FunctionType.GENERAL,
-            CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-            CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+            CatalogUtil.newSimpleDataType(Type.INT4),
+            CatalogUtil.newSimpleDataTypeArray(Type.INT4)));
 
     sqlFuncs.add(
         new FunctionDesc("split_part", SplitPart.class, FunctionType.GENERAL,
-            CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
-            CatalogUtil.newDataTypesWithoutLen(Type.TEXT, Type.TEXT, Type.INT4)));
+            CatalogUtil.newSimpleDataType(Type.TEXT),
+            CatalogUtil.newSimpleDataTypeArray(Type.TEXT, Type.TEXT, Type.INT4)));
     sqlFuncs.add(
         new FunctionDesc("regexp_replace", RegexpReplace.class, FunctionType.GENERAL,
-            CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
-            CatalogUtil.newDataTypesWithoutLen(Type.TEXT, Type.TEXT, Type.TEXT)));
+            CatalogUtil.newSimpleDataType(Type.TEXT),
+            CatalogUtil.newSimpleDataTypeArray(Type.TEXT, Type.TEXT, Type.TEXT)));
 
     return sqlFuncs;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 254fd7c..e2978af 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -574,7 +574,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       int mb = (int) Math.ceil((double)volume / 1048576);
       LOG.info("Table's volume is approximately " + mb + " MB");
       // determine the number of task per 64MB
-      int maxTaskNum = (int) Math.ceil((double)mb / 64);
+      int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
       LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
       return maxTaskNum;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/main/proto/InternalTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/InternalTypes.proto b/tajo-core/tajo-core-backend/src/main/proto/InternalTypes.proto
new file mode 100644
index 0000000..5a9afc7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/InternalTypes.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo";
+option java_outer_classname = "InternalTypes";
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+message AvgLongProto {
+  required int64 sum = 1;
+  required int64 count = 2;
+}
+
+message AvgDoubleProto {
+  required double sum = 1;
+  required int64 count = 2;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index b407f33..1fbd022 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -22,12 +22,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlan;
@@ -70,8 +70,8 @@ public class TestEvalTree {
     cat.addTable(desc);
 
     FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(INT4),
-        CatalogUtil.newDataTypesWithoutLen(INT4, INT4));
+        CatalogUtil.newSimpleDataType(INT4),
+        CatalogUtil.newSimpleDataTypeArray(INT4, INT4));
     cat.createFunction(funcMeta);
 
     analyzer = new SQLAnalyzer();
@@ -204,7 +204,7 @@ public class TestEvalTree {
   public void testTupleEval() throws CloneNotSupportedException {
     ConstEval e1 = new ConstEval(DatumFactory.createInt4(1));
     assertCloneEqual(e1);
-    FieldEval e2 = new FieldEval("table1.score", CatalogUtil.newDataTypeWithoutLen(INT4)); // it indicates
+    FieldEval e2 = new FieldEval("table1.score", CatalogUtil.newSimpleDataType(INT4)); // it indicates
     assertCloneEqual(e2);
 
     Schema schema1 = new Schema();
@@ -250,8 +250,8 @@ public class TestEvalTree {
     }
 
     @Override
-    public DataType [] getValueType() {
-      return CatalogUtil.newDataTypesWithoutLen(BOOLEAN);
+    public DataType getValueType() {
+      return CatalogUtil.newSimpleDataType(BOOLEAN);
     }
 
   }
@@ -283,8 +283,8 @@ public class TestEvalTree {
     }
 
     @Override
-    public DataType [] getValueType() {
-      return CatalogUtil.newDataTypesWithoutLen(BOOLEAN);
+    public DataType getValueType() {
+      return CatalogUtil.newSimpleDataType(BOOLEAN);
     }
   }
 
@@ -471,18 +471,18 @@ public class TestEvalTree {
     e1 = new ConstEval(DatumFactory.createInt4(9));
     e2 = new ConstEval(DatumFactory.createInt4(34));
     BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2);
-    assertEquals(CatalogUtil.newDataTypeWithoutLen(INT4), expr.getValueType()[0]);
+    assertEquals(CatalogUtil.newSimpleDataType(INT4), expr.getValueType());
 
     expr = new BinaryEval(EvalType.LTH, e1, e2);
     EvalContext evalCtx = expr.newContext();
     expr.eval(evalCtx, null, null);
     assertTrue(expr.terminate(evalCtx).asBool());
-    assertEquals(CatalogUtil.newDataTypeWithoutLen(BOOLEAN), expr.getValueType()[0]);
+    assertEquals(CatalogUtil.newSimpleDataType(BOOLEAN), expr.getValueType());
 
     e1 = new ConstEval(DatumFactory.createFloat8(9.3));
     e2 = new ConstEval(DatumFactory.createFloat8(34.2));
     expr = new BinaryEval(EvalType.PLUS, e1, e2);
-    assertEquals(CatalogUtil.newDataTypeWithoutLen(FLOAT8), expr.getValueType()[0]);
+    assertEquals(CatalogUtil.newSimpleDataType(FLOAT8), expr.getValueType());
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 7fcf190..a561a8c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -78,8 +78,8 @@ public class TestEvalTreeUtil {
 
     FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class,
         FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.INT4, TajoDataTypes.Type.INT4));
+        CatalogUtil.newSimpleDataType(TajoDataTypes.Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(TajoDataTypes.Type.INT4, TajoDataTypes.Type.INT4));
     catalog.createFunction(funcMeta);
 
     analyzer = new SQLAnalyzer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 9ef6ab0..b30fbf2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -82,8 +82,8 @@ public class TestLogicalOptimizer {
     catalog.addTable(score);
 
     FunctionDesc funcDesc = new FunctionDesc("sumtest", SumInt.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4));
 
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 06bc52f..7adf8d2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -91,8 +91,8 @@ public class TestLogicalPlanner {
     catalog.addTable(score);
 
     FunctionDesc funcDesc = new FunctionDesc("sumtest", SumInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4));
 
 
     // TPC-H Schema for Complex Queries

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 130c2f7..3fce9d2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -55,7 +55,7 @@ public class TestPlannerUtil {
 
     Schema schema = new Schema();
     schema.addColumn("name", Type.TEXT);
-    schema.addColumn("empId", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    schema.addColumn("empId", CatalogUtil.newSimpleDataType(Type.INT4));
     schema.addColumn("deptName", Type.TEXT);
 
     Schema schema2 = new Schema();
@@ -64,7 +64,7 @@ public class TestPlannerUtil {
 
     Schema schema3 = new Schema();
     schema3.addColumn("deptName", Type.TEXT);
-    schema3.addColumn("score", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    schema3.addColumn("score", CatalogUtil.newSimpleDataType(Type.INT4));
 
     TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
     TableDesc people = new TableDescImpl("employee", meta,
@@ -82,8 +82,8 @@ public class TestPlannerUtil {
     catalog.addTable(score);
 
     FunctionDesc funcDesc = new FunctionDesc("sumtest", SumInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4));
+        CatalogUtil.newSimpleDataType(Type.INT4),
+        CatalogUtil.newSimpleDataTypeArray(Type.INT4));
 
     catalog.createFunction(funcDesc);
     analyzer = new SQLAnalyzer();
@@ -157,9 +157,9 @@ public class TestPlannerUtil {
 
   @Test
   public final void testIsJoinQual() {
-    FieldEval f1 = new FieldEval("part.p_partkey", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    FieldEval f1 = new FieldEval("part.p_partkey", CatalogUtil.newSimpleDataType(Type.INT4));
     FieldEval f2 = new FieldEval("partsupp.ps_partkey",
-        CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+        CatalogUtil.newSimpleDataType(Type.INT4));
 
 
     BinaryEval [] joinQuals = new BinaryEval[5];
@@ -190,16 +190,16 @@ public class TestPlannerUtil {
   @Test
   public final void testGetJoinKeyPairs() {
     Schema outerSchema = new Schema();
-    outerSchema.addColumn("employee.id1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    outerSchema.addColumn("employee.id2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    outerSchema.addColumn("employee.id1", CatalogUtil.newSimpleDataType(Type.INT4));
+    outerSchema.addColumn("employee.id2", CatalogUtil.newSimpleDataType(Type.INT4));
     Schema innerSchema = new Schema();
-    innerSchema.addColumn("people.fid1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    innerSchema.addColumn("people.fid2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    innerSchema.addColumn("people.fid1", CatalogUtil.newSimpleDataType(Type.INT4));
+    innerSchema.addColumn("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
 
-    FieldEval f1 = new FieldEval("employee.id1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f2 = new FieldEval("people.fid1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f3 = new FieldEval("employee.id2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    FieldEval f1 = new FieldEval("employee.id1", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f2 = new FieldEval("people.fid1", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f3 = new FieldEval("employee.id2", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
 
     EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2);
 
@@ -236,16 +236,16 @@ public class TestPlannerUtil {
   @Test
   public final void testGetSortKeysFromJoinQual() {
     Schema outerSchema = new Schema();
-    outerSchema.addColumn("employee.id1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    outerSchema.addColumn("employee.id2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    outerSchema.addColumn("employee.id1", CatalogUtil.newSimpleDataType(Type.INT4));
+    outerSchema.addColumn("employee.id2", CatalogUtil.newSimpleDataType(Type.INT4));
     Schema innerSchema = new Schema();
-    innerSchema.addColumn("people.fid1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    innerSchema.addColumn("people.fid2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    innerSchema.addColumn("people.fid1", CatalogUtil.newSimpleDataType(Type.INT4));
+    innerSchema.addColumn("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
 
-    FieldEval f1 = new FieldEval("employee.id1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f2 = new FieldEval("people.fid1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f3 = new FieldEval("employee.id2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    FieldEval f1 = new FieldEval("employee.id1", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f2 = new FieldEval("people.fid1", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f3 = new FieldEval("employee.id2", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
 
     EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2);
     SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(joinQual, outerSchema, innerSchema);
@@ -272,16 +272,16 @@ public class TestPlannerUtil {
   @Test
   public final void testComparatorsFromJoinQual() {
     Schema outerSchema = new Schema();
-    outerSchema.addColumn("employee.id1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    outerSchema.addColumn("employee.id2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    outerSchema.addColumn("employee.id1", CatalogUtil.newSimpleDataType(Type.INT4));
+    outerSchema.addColumn("employee.id2", CatalogUtil.newSimpleDataType(Type.INT4));
     Schema innerSchema = new Schema();
-    innerSchema.addColumn("people.fid1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    innerSchema.addColumn("people.fid2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    innerSchema.addColumn("people.fid1", CatalogUtil.newSimpleDataType(Type.INT4));
+    innerSchema.addColumn("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
 
-    FieldEval f1 = new FieldEval("employee.id1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f2 = new FieldEval("people.fid1", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f3 = new FieldEval("employee.id2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
-    FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newDataTypeWithoutLen(Type.INT4));
+    FieldEval f1 = new FieldEval("employee.id1", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f2 = new FieldEval("people.fid1", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f3 = new FieldEval("employee.id2", CatalogUtil.newSimpleDataType(Type.INT4));
+    FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
 
     EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2);
     TupleComparator [] comparators = PlannerUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index c070c4d..07e6804 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -86,7 +86,7 @@ public class TestRowFile {
     appender.enableStats();
     appender.init();
 
-    int tupleNum = 100000;
+    int tupleNum = 100;
     Tuple tuple;
     Datum stringDatum = DatumFactory.createText("abcdefghijklmnopqrstuvwxyz");
     Set<Integer> idSet = Sets.newHashSet();
@@ -100,15 +100,11 @@ public class TestRowFile {
       appender.addTuple(tuple);
       idSet.add(i+1);
     }
-
-    long end = System.currentTimeMillis();
     appender.close();
 
     TableStat stat = appender.getStats();
     assertEquals(tupleNum, stat.getNumRows().longValue());
 
-    System.out.println("append time: " + (end - start));
-
     FileStatus file = fs.getFileStatus(dataPath);
     TableProto proto = (TableProto) FileUtil.loadProto(
         cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
@@ -123,18 +119,14 @@ public class TestRowFile {
       tupleCnt++;
     }
     scanner.close();
-    end = System.currentTimeMillis();
 
     assertEquals(tupleNum, tupleCnt);
-    System.out.println("scan time: " + (end - start));
 
     tupleCnt = 0;
     long fileStart = 0;
     long fileLen = file.getLen()/13;
-    System.out.println("total length: " + file.getLen());
 
     for (int i = 0; i < 13; i++) {
-      System.out.println("range: " + fileStart + ", " + fileLen);
       fragment = new Fragment("test.tbl", dataPath, meta, fileStart, fileLen);
       scanner = new RowFile.RowFileScanner(conf, meta, fragment);
       scanner.init();
@@ -144,17 +136,12 @@ public class TestRowFile {
         }
         tupleCnt++;
       }
-      System.out.println("tuple count: " + tupleCnt);
       scanner.close();
       fileStart += fileLen;
       if (i == 11) {
         fileLen = file.getLen() - fileStart;
       }
     }
-
-    for (Integer id : idSet) {
-      System.out.println("remaining id: " + id);
-    }
     assertEquals(tupleNum, tupleCnt);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index fb66198..3021f7d 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -33,10 +33,11 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.ArrayDatum;
 import org.apache.tajo.datum.CharDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
@@ -67,6 +68,7 @@ public class CSVFile {
     private CompressionCodec codec;
     private Path compressedPath;
     private byte[] nullChars;
+    private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
 
     public CSVAppender(Configuration conf, final TableMeta meta,
                        final Path path) throws IOException {
@@ -129,30 +131,31 @@ public class CSVFile {
       Datum datum;
 
       int colNum = schema.getColumnNum();
-      if(tuple instanceof LazyTuple){
+      if (tuple instanceof LazyTuple) {
         LazyTuple  lTuple = (LazyTuple)tuple;
         for (int i = 0; i < colNum; i++) {
-          col = schema.getColumn(i);
-          if (col.getDataType().getType().equals(TajoDataTypes.Type.NULL)) {
+          TajoDataTypes.DataType dataType = schema.getColumn(i).getDataType();
+          datum = tuple.get(i);
 
-          } else if (col.getDataType().getType().equals(TajoDataTypes.Type.CHAR)) {
-            datum = tuple.get(i);
-            if (datum instanceof NullDatum) {
-              outputStream.write(nullChars);
-            } else {
-              byte[] pad = new byte[col.getDataType().getLength() - datum.size()];
+          switch (dataType.getType()) {
+            case TEXT:
+              outputStream.write(datum.asTextBytes());
+              break;
+            case CHAR:
+              byte[] pad = new byte[dataType.getLength() - datum.size()];
               outputStream.write(datum.asTextBytes());
               outputStream.write(pad);
-            }
-          } else if (col.getDataType().getType().equals(TajoDataTypes.Type.TEXT)) {
-            datum = tuple.get(i);
-            if (datum instanceof NullDatum) {
+              break;
+            case NULL:
               outputStream.write(nullChars);
-            } else {
-              outputStream.write(datum.asTextBytes());
-            }
-          } else {
-            outputStream.write(lTuple.getTextBytes(i));
+              break;
+            case PROTOBUF:
+              ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+              protobufJsonFormat.print(protobufDatum.get(), outputStream);
+              break;
+            default:
+              outputStream.write(lTuple.getTextBytes(i));
+              break;
           }
 
           if(colNum - 1 > i){
@@ -213,15 +216,10 @@ public class CSVFile {
                 break;
               case INET6:
                 outputStream.write(tuple.getIPv6(i).toString().getBytes());
-              case ARRAY:
-            /*
-             * sb.append("["); boolean first = true; ArrayDatum array =
-             * (ArrayDatum) tuple.get(i); for (Datum field : array.toArray()) {
-             * if (first) { first = false; } else { sb.append(delimiter); }
-             * sb.append(field.asChars()); } sb.append("]");
-             */
-                ArrayDatum array = (ArrayDatum) tuple.get(i);
-                outputStream.write(array.toJson().getBytes());
+                break;
+              case PROTOBUF:
+                ProtobufDatum protobuf = (ProtobufDatum) datum;
+                ProtobufJsonFormat.getInstance().print(protobuf.get(), outputStream);
                 break;
               default:
                 throw new UnsupportedOperationException("Cannot write such field: "

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4bf5c0c3/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index e063eda..1d1d5fe 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -18,14 +18,16 @@
 
 package org.apache.tajo.storage;
 
+import com.google.protobuf.Message;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.datum.exception.InvalidCastException;
-import org.apache.tajo.storage.json.StorageGsonHelper;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 import org.apache.tajo.util.Bytes;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Arrays;
 
@@ -121,7 +123,7 @@ public class LazyTuple implements Tuple {
     else if (textBytes.length <= fieldId) {
       values[fieldId] = NullDatum.get();  // split error. (col : 3, separator: ',', row text: "a,")
     } else if (textBytes[fieldId] != null) {
-      values[fieldId] = createByTextBytes(schema.getColumn(fieldId).getDataType().getType(), textBytes[fieldId]);
+      values[fieldId] = createByTextBytes(schema.getColumn(fieldId).getDataType(), textBytes[fieldId]);
       textBytes[fieldId] = null;
     } else {
       //non-projection
@@ -301,8 +303,8 @@ public class LazyTuple implements Tuple {
     return !isNullText((val));
   }
 
-  private  Datum createByTextBytes(TajoDataTypes.Type type, byte[] val) {
-    switch (type) {
+  private  Datum createByTextBytes(TajoDataTypes.DataType type, byte [] val) {
+    switch (type.getType()) {
       case BOOLEAN:
         return isNotNull(val) ? DatumFactory.createBool(new String(val)) : NullDatum.get();
       case INT2:
@@ -325,8 +327,21 @@ public class LazyTuple implements Tuple {
         return DatumFactory.createBlob(Base64.decodeBase64(val));
       case INET4:
         return isNotNull(val) ? DatumFactory.createInet4(new String(val)) : NullDatum.get();
-      case ARRAY:
-        return isNotNull(val) ? StorageGsonHelper.getInstance().fromJson(new String(val), Datum.class) : NullDatum.get();
+      case PROTOBUF: {
+        if (isNotNull(val)) {
+          ProtobufDatumFactory factory = ProtobufDatumFactory.get(type);
+          Message.Builder builder = factory.newBuilder();
+          try {
+            ProtobufJsonFormat.getInstance().merge(val, builder);
+            return factory.createDatum(builder.build());
+          } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        } else {
+          return NullDatum.get();
+        }
+      }
       case NULL:
         return NullDatum.get();
       default: