You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/05/09 07:38:39 UTC

[07/11] TAJO-57: Recognize Parser and Catalog Standard SQL data types. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/RangePartitionAlgorithm.java
index 570e416..7d4ce4a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -20,7 +20,7 @@ package tajo.engine.planner;
 
 import tajo.catalog.Column;
 import tajo.catalog.Schema;
-import tajo.catalog.proto.CatalogProtos;
+import tajo.common.TajoDataTypes.DataType;
 import tajo.datum.Datum;
 import tajo.storage.Tuple;
 import tajo.storage.TupleRange;
@@ -55,33 +55,33 @@ public abstract class RangePartitionAlgorithm {
    * @param end
    * @return
    */
-  public static BigDecimal computeCardinality(CatalogProtos.DataType dataType, Datum start, Datum end,
+  public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
                                               boolean inclusive) {
     BigDecimal columnCard;
 
-    switch (dataType) {
+    switch (dataType.getType()) {
       case CHAR:
         columnCard = new BigDecimal(end.asChar() - start.asChar());
         break;
-      case BYTE:
+      case BIT:
         columnCard = new BigDecimal(end.asByte() - start.asByte());
         break;
-      case SHORT:
-        columnCard = new BigDecimal(end.asShort() - start.asShort());
+      case INT2:
+        columnCard = new BigDecimal(end.asInt2() - start.asInt2());
         break;
-      case INT:
-        columnCard = new BigDecimal(end.asInt() - start.asInt());
+      case INT4:
+        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
         break;
-      case LONG:
-        columnCard = new BigDecimal(end.asLong() - start.asLong());
+      case INT8:
+        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
         break;
-      case FLOAT:
-        columnCard = new BigDecimal(end.asInt() - start.asInt());
+      case FLOAT4:
+        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
         break;
-      case DOUBLE:
-        columnCard = new BigDecimal(end.asLong() - start.asLong());
+      case FLOAT8:
+        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
         break;
-      case STRING:
+      case TEXT:
         columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
         break;
       default:

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/UniformRangePartition.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/UniformRangePartition.java
index bee8534..87dac08 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/UniformRangePartition.java
@@ -112,8 +112,8 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     Column column = schema.getColumn(colId);
     BigDecimal candidate;
     boolean overflow = false;
-    switch (column.getDataType()) {
-      case BYTE: {
+    switch (column.getDataType().getType()) {
+      case BIT: {
         candidate = inc.add(new BigDecimal(last.asByte()));
         return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
       }
@@ -121,27 +121,27 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         candidate = inc.add(new BigDecimal((int)last.asChar()));
         return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
       }
-      case SHORT: {
-        candidate = inc.add(new BigDecimal(last.asShort()));
-        return new BigDecimal(range.getEnd().get(colId).asShort()).compareTo(candidate) < 0;
+      case INT2: {
+        candidate = inc.add(new BigDecimal(last.asInt2()));
+        return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
       }
-      case INT: {
-        candidate = inc.add(new BigDecimal(last.asInt()));
-        return new BigDecimal(range.getEnd().get(colId).asInt()).compareTo(candidate) < 0;
+      case INT4: {
+        candidate = inc.add(new BigDecimal(last.asInt4()));
+        return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
       }
-      case LONG: {
-        candidate = inc.add(new BigDecimal(last.asLong()));
-        return new BigDecimal(range.getEnd().get(colId).asLong()).compareTo(candidate) < 0;
+      case INT8: {
+        candidate = inc.add(new BigDecimal(last.asInt8()));
+        return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
       }
-      case FLOAT: {
-        candidate = inc.add(new BigDecimal(last.asFloat()));
-        return new BigDecimal(range.getEnd().get(colId).asFloat()).compareTo(candidate) < 0;
+      case FLOAT4: {
+        candidate = inc.add(new BigDecimal(last.asFloat4()));
+        return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
       }
-      case DOUBLE: {
-        candidate = inc.add(new BigDecimal(last.asDouble()));
-        return new BigDecimal(range.getEnd().get(colId).asDouble()).compareTo(candidate) < 0;
+      case FLOAT8: {
+        candidate = inc.add(new BigDecimal(last.asFloat8()));
+        return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
       }
-      case STRING: {
+      case TEXT: {
         candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
         return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
       }
@@ -152,54 +152,47 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
   public long incrementAndGetReminder(int colId, Datum last, long inc) {
     Column column = schema.getColumn(colId);
     long reminder = 0;
-    switch (column.getDataType()) {
-      case BYTE: {
+    switch (column.getDataType().getType()) {
+      case BIT: {
         long candidate = last.asByte() + inc;
         byte end = range.getEnd().get(colId).asByte();
-        long longReminder = candidate - end;
-        reminder = longReminder;
+        reminder = candidate - end;
         break;
       }
       case CHAR: {
         long candidate = last.asChar() + inc;
         char end = range.getEnd().get(colId).asChar();
-        long longReminder = candidate - end;
-        reminder = longReminder;
+        reminder = candidate - end;
         break;
       }
-      case INT: {
-        int candidate = (int) (last.asInt() + inc);
-        int end = range.getEnd().get(colId).asInt();
-        int longReminder = candidate - end;
-        reminder = longReminder;
+      case INT4: {
+        int candidate = (int) (last.asInt4() + inc);
+        int end = range.getEnd().get(colId).asInt4();
+        reminder = candidate - end;
         break;
       }
-      case LONG: {
-        long candidate = last.asLong() + inc;
-        long end = range.getEnd().get(colId).asLong();
-        long longReminder = candidate - end;
-        reminder = longReminder;
+      case INT8: {
+        long candidate = last.asInt8() + inc;
+        long end = range.getEnd().get(colId).asInt8();
+        reminder = candidate - end;
         break;
       }
-      case FLOAT: {
-        float candidate = last.asFloat() + inc;
-        float end = range.getEnd().get(colId).asFloat();
-        float longReminder = candidate - end;
-        reminder = (long) longReminder;
+      case FLOAT4: {
+        float candidate = last.asFloat4() + inc;
+        float end = range.getEnd().get(colId).asFloat4();
+        reminder = (long) (candidate - end);
         break;
       }
-      case DOUBLE: {
-        double candidate = last.asDouble() + inc;
-        double end = range.getEnd().get(colId).asDouble();
-        double longReminder = candidate - end;
-        reminder = (long) Math.ceil(longReminder);
+      case FLOAT8: {
+        double candidate = last.asFloat8() + inc;
+        double end = range.getEnd().get(colId).asFloat8();
+        reminder = (long) Math.ceil(candidate - end);
         break;
       }
-      case STRING: {
+      case TEXT: {
         char candidate = ((char)(last.asChars().charAt(0) + inc));
         char end = range.getEnd().get(colId).asChars().charAt(0);
-        char charReminder = (char) (candidate - end);
-        reminder = charReminder;
+        reminder = (char) (candidate - end);
         break;
       }
     }
@@ -263,7 +256,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     Column column;
     for (int i = 0; i < last.size(); i++) {
       column = schema.getColumn(i);
-      switch (column.getDataType()) {
+      switch (column.getDataType().getType()) {
         case CHAR:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createChar((char) (range.getStart().get(i).asChar() + incs[i].longValue())));
@@ -271,54 +264,61 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
           }
           break;
-        case BYTE:
+        case BIT:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createByte((byte) (range.getStart().get(i).asByte() + incs[i].longValue())));
+            end.put(i, DatumFactory.createBit(
+                (byte) (range.getStart().get(i).asByte() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createByte((byte) (last.get(i).asByte() + incs[i].longValue())));
+            end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
           }
           break;
-        case SHORT:
+        case INT2:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createShort((short) (range.getStart().get(i).asShort() + incs[i].longValue())));
+            end.put(i, DatumFactory.createInt2(
+                (short) (range.getStart().get(i).asInt2() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createShort((short) (last.get(i).asShort() + incs[i].longValue())));
+            end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
           }
           break;
-        case INT:
+        case INT4:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createInt((int) (range.getStart().get(i).asInt() + incs[i].longValue())));
+            end.put(i, DatumFactory.createInt4(
+                (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createInt((int) (last.get(i).asInt() + incs[i].longValue())));
+            end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
           }
           break;
-        case LONG:
+        case INT8:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createLong(range.getStart().get(i).asInt() + incs[i].longValue()));
+            end.put(i, DatumFactory.createInt8(
+                range.getStart().get(i).asInt4() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createLong(last.get(i).asLong() + incs[i].longValue()));
+            end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
           }
           break;
-        case FLOAT:
+        case FLOAT4:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createFloat(range.getStart().get(i).asFloat() + incs[i].longValue()));
+            end.put(i, DatumFactory.createFloat4(
+                range.getStart().get(i).asFloat4() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createFloat(last.get(i).asFloat() + incs[i].longValue()));
+            end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
           }
           break;
-        case DOUBLE:
+        case FLOAT8:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createDouble(range.getStart().get(i).asDouble() + incs[i].longValue()));
+            end.put(i, DatumFactory.createFloat8(
+                range.getStart().get(i).asFloat8() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createDouble(last.get(i).asDouble() + incs[i].longValue()));
+            end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
           }
           break;
-        case STRING:
+        case TEXT:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createString(((char)(range.getStart().get(i).asChars().charAt(0)
+            end.put(i, DatumFactory.createText(((char) (range.getStart().get(i).asChars().charAt(0)
                 + incs[i].longValue())) + ""));
           } else {
-            end.put(i, DatumFactory.createString(((char)(last.get(i).asChars().charAt(0) + incs[i].longValue())) + ""));
+            end.put(i, DatumFactory.createText(
+                ((char) (last.get(i).asChars().charAt(0) + incs[i].longValue())) + ""));
           }
           break;
         default:

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/ExternalSortExec.java
index 20fa0b0..62877e4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/ExternalSortExec.java
@@ -21,7 +21,7 @@ package tajo.engine.planner.physical;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import tajo.TaskAttemptContext;
-import tajo.catalog.TCatUtil;
+import tajo.catalog.CatalogUtil;
 import tajo.catalog.TableMeta;
 import tajo.catalog.proto.CatalogProtos.StoreType;
 import tajo.conf.TajoConf.ConfVars;
@@ -56,7 +56,7 @@ public class ExternalSortExec extends SortExec {
 
     this.sortTmpDir = new Path(context.getWorkDir(), UUID.randomUUID().toString());
     this.localFS = FileSystem.getLocal(context.getConf());
-    meta = TCatUtil.newTableMeta(inSchema, StoreType.ROWFILE);
+    meta = CatalogUtil.newTableMeta(inSchema, StoreType.ROWFILE);
   }
 
   public void init() throws IOException {
@@ -70,7 +70,7 @@ public class ExternalSortExec extends SortExec {
 
   private void sortAndStoreChunk(int chunkId, List<Tuple> tupleSlots)
       throws IOException {
-    TableMeta meta = TCatUtil.newTableMeta(inSchema, StoreType.RAW);
+    TableMeta meta = CatalogUtil.newTableMeta(inSchema, StoreType.RAW);
     Collections.sort(tupleSlots, getComparator());
     // TODO - RawFile requires the local file path.
     // So, I add the scheme 'file:/' to path. But, it should be improved.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/IndexedStoreExec.java
index 4a693b4..d0c6a7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -67,7 +67,7 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
     this.comp = new TupleComparator(keySchema, sortSpecs);
     Path storeTablePath = new Path(context.getWorkDir(), "output");
     LOG.info("Output data directory: " + storeTablePath);
-    this.meta = TCatUtil
+    this.meta = CatalogUtil
         .newTableMeta(this.outSchema, CatalogProtos.StoreType.CSV);
     FileSystem fs = new RawLocalFileSystem();
     fs.mkdirs(storeTablePath);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/JoinTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/JoinTupleComparator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/JoinTupleComparator.java
index 88c8485..0ed11cd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/JoinTupleComparator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/JoinTupleComparator.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import tajo.catalog.Schema;
 import tajo.catalog.SortSpec;
 import tajo.datum.Datum;
-import tajo.datum.DatumType;
+import tajo.datum.NullDatum;
 import tajo.storage.Tuple;
 
 import java.util.Comparator;
@@ -64,11 +64,11 @@ public class JoinTupleComparator implements Comparator<Tuple> {
       outer = outerTuple.get(outerSortKeyIds[i]);
       inner = innerTuple.get(innerSortKeyIds[i]);
 
-      if (outer.type() == DatumType.NULL || inner.type() == DatumType.NULL) {
+      if (outer instanceof NullDatum || inner instanceof NullDatum) {
         if (!outer.equals(inner)) {
-          if (outer.type() == DatumType.NULL) {
+          if (outer instanceof NullDatum) {
             compVal = 1;
-          } else if (inner.type() == DatumType.NULL) {
+          } else if (inner instanceof NullDatum) {
             compVal = -1;
           }
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/PartitionedStoreExec.java
index 056d963..a42f9bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import tajo.TaskAttemptContext;
+import tajo.catalog.CatalogUtil;
 import tajo.catalog.Column;
-import tajo.catalog.TCatUtil;
 import tajo.catalog.TableMeta;
 import tajo.catalog.proto.CatalogProtos.StoreType;
 import tajo.catalog.statistics.StatisticsUtil;
@@ -69,7 +69,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     Preconditions.checkArgument(plan.hasPartitionKey());
     this.plan = plan;
-    this.meta = TCatUtil.newTableMeta(this.outSchema, StoreType.CSV);
+    this.meta = CatalogUtil.newTableMeta(this.outSchema, StoreType.CSV);
     
     // about the partitions
     this.numPartitions = this.plan.getNumPartitions();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/StoreTableExec.java
index 70a7a3c..f250438 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/physical/StoreTableExec.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import tajo.TaskAttemptContext;
-import tajo.catalog.TCatUtil;
+import tajo.catalog.CatalogUtil;
 import tajo.catalog.TableMeta;
 import tajo.engine.planner.logical.StoreTableNode;
 import tajo.storage.Appender;
@@ -59,9 +59,9 @@ public class StoreTableExec extends UnaryPhysicalExec {
 
     TableMeta meta;
     if (plan.hasOptions()) {
-      meta = TCatUtil.newTableMeta(outSchema, plan.getStorageType(), plan.getOptions());
+      meta = CatalogUtil.newTableMeta(outSchema, plan.getStorageType(), plan.getOptions());
     } else {
-      meta = TCatUtil.newTableMeta(outSchema, plan.getStorageType());
+      meta = CatalogUtil.newTableMeta(outSchema, plan.getStorageType());
     }
 
     if (context.isInterQuery()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetImpl.java
index abe78c0..89d52e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetImpl.java
@@ -28,7 +28,7 @@ import tajo.catalog.TableMeta;
 import tajo.catalog.TableMetaImpl;
 import tajo.catalog.proto.CatalogProtos.TableProto;
 import tajo.datum.Datum;
-import tajo.datum.DatumType;
+import tajo.datum.NullDatum;
 import tajo.exception.UnsupportedException;
 import tajo.storage.Fragment;
 import tajo.storage.MergeScanner;
@@ -585,7 +585,7 @@ public class ResultSetImpl implements ResultSet {
   public double getDouble(int fieldId) throws SQLException {
     Datum datum = cur.getDouble(fieldId - 1);
     handleNull(datum);
-    return datum.asDouble();
+    return datum.asFloat8();
   }
 
   /*
@@ -597,7 +597,7 @@ public class ResultSetImpl implements ResultSet {
   public double getDouble(String name) throws SQLException {
     Datum datum = cur.get(findColumn(name));
     handleNull(datum);
-    return datum.asDouble();
+    return datum.asFloat8();
   }
 
   /*
@@ -629,7 +629,7 @@ public class ResultSetImpl implements ResultSet {
   public float getFloat(int fieldId) throws SQLException {
     Datum datum = cur.get(fieldId - 1);
     handleNull(datum);
-    return datum.asFloat();
+    return datum.asFloat4();
   }
 
   /*
@@ -641,7 +641,7 @@ public class ResultSetImpl implements ResultSet {
   public float getFloat(String name) throws SQLException {
     Datum datum = cur.get(findColumn(name));
     handleNull(datum);
-    return datum.asFloat();
+    return datum.asFloat4();
   }
 
   /*
@@ -663,7 +663,7 @@ public class ResultSetImpl implements ResultSet {
   public int getInt(int fieldId) throws SQLException {
     Datum datum = cur.get(fieldId - 1);
     handleNull(datum);
-    return datum.asInt();
+    return datum.asInt4();
   }
 
   /*
@@ -675,7 +675,7 @@ public class ResultSetImpl implements ResultSet {
   public int getInt(String name) throws SQLException {
     Datum datum = cur.get(findColumn(name));
     handleNull(datum);
-    return datum.asInt();
+    return datum.asInt4();
   }
 
   /*
@@ -687,7 +687,7 @@ public class ResultSetImpl implements ResultSet {
   public long getLong(int fieldId) throws SQLException {
     Datum datum = cur.get(fieldId - 1);
     handleNull(datum);
-    return datum.asLong();
+    return datum.asInt8();
   }
 
   /*
@@ -699,7 +699,7 @@ public class ResultSetImpl implements ResultSet {
   public long getLong(String name) throws SQLException {
     Datum datum = cur.get(findColumn(name));
     handleNull(datum);
-    return datum.asLong();
+    return datum.asInt8();
   }
 
   /*
@@ -907,7 +907,7 @@ public class ResultSetImpl implements ResultSet {
   public short getShort(int fieldId) throws SQLException {
     Datum datum = cur.get(fieldId - 1);
     handleNull(datum);
-    return datum.asShort();
+    return datum.asInt2();
   }
 
   /*
@@ -919,7 +919,7 @@ public class ResultSetImpl implements ResultSet {
   public short getShort(String name) throws SQLException {
     Datum datum = cur.get(findColumn(name));
     handleNull(datum);
-    return datum.asShort();
+    return datum.asInt2();
   }
 
   /*
@@ -2195,6 +2195,6 @@ public class ResultSetImpl implements ResultSet {
   }
 
   private void handleNull(Datum d) {
-    wasNull = (d.type() == DatumType.NULL);
+    wasNull = (d instanceof NullDatum);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetMetaDataImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetMetaDataImpl.java
index ee9b7cf..baea966 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetMetaDataImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/ResultSetMetaDataImpl.java
@@ -22,7 +22,7 @@
 package tajo.engine.query;
 
 import tajo.catalog.TableMeta;
-import tajo.catalog.proto.CatalogProtos.DataType;
+import tajo.common.TajoDataTypes.DataType;
 import tajo.exception.UnsupportedException;
 
 import java.nio.channels.UnsupportedAddressTypeException;
@@ -109,35 +109,35 @@ public class ResultSetMetaDataImpl implements ResultSetMetaData {
   public int getColumnType(int column) throws SQLException {
     // TODO
     DataType type = meta.getSchema().getColumn(column - 1).getDataType();
-    switch (type) {
-    case BOOLEAN:
-      return Types.BOOLEAN;
-    case BIGDECIMAL:
-      return Types.DECIMAL;
-    case BIGINT:
-      return Types.BIGINT;
-    case BYTE:
-      return Types.TINYINT;
-    case BYTES:
-      return Types.VARBINARY;
-    case CHAR:
-      return Types.CHAR;
-    case DATE:
-      return Types.DATE;
-    case DOUBLE:
-      return Types.DOUBLE;
-    case FLOAT:
-      return Types.FLOAT;
-    case INT:
-      return Types.INTEGER;
-    case LONG:
-      return Types.BIGINT;
-    case SHORT:
-      return Types.SMALLINT;
-    case STRING:
-      return Types.VARCHAR;
-    default:
-      throw new UnsupportedException();
+    switch (type.getType()) {
+      case BOOLEAN:
+        return Types.BOOLEAN;
+      case INT1:
+        return Types.TINYINT;
+      case INT2:
+        return Types.SMALLINT;
+      case INT4:
+        return Types.INTEGER;
+      case INT8:
+        return Types.BIGINT;
+      case FLOAT4:
+        return Types.FLOAT;
+      case FLOAT8:
+        return Types.DOUBLE;
+      case DECIMAL:
+        return Types.DECIMAL;
+      case VARBINARY:
+        return Types.VARBINARY;
+      case CHAR:
+        return Types.CHAR;
+      case DATE:
+        return Types.DATE;
+      case VARCHAR:
+        return Types.VARCHAR;
+      case TEXT:
+        return Types.VARCHAR;
+      default:
+        throw new UnsupportedException();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidCastException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidCastException.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidCastException.java
deleted file mode 100644
index 9732391..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidCastException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package tajo.engine.query.exception;
-
-import tajo.catalog.proto.CatalogProtos.DataType;
-
-public class InvalidCastException extends InvalidQueryException {
-	private static final long serialVersionUID = -5090530469575858320L;
-
-	/**
-	 * @param message
-	 */
-	public InvalidCastException(DataType src, DataType target) {
-		super("Error: cannot cast " + src + " to " + target);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidEvalException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidEvalException.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidEvalException.java
deleted file mode 100644
index 121f89b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/query/exception/InvalidEvalException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.engine.query.exception;
-
-public class InvalidEvalException extends InvalidQueryException {
-	private static final long serialVersionUID = -2897003028483298256L;
-
-	/**
-	 * @param message
-	 */
-	public InvalidEvalException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/SchemaUtil.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/SchemaUtil.java
index bde92c1..894b0d4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/SchemaUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/SchemaUtil.java
@@ -20,7 +20,7 @@ package tajo.engine.utils;
 
 import tajo.catalog.Column;
 import tajo.catalog.Schema;
-import tajo.catalog.proto.CatalogProtos.DataType;
+import tajo.common.TajoDataTypes.DataType;
 import tajo.engine.parser.QueryBlock;
 
 import java.util.Collection;
@@ -56,7 +56,7 @@ public class SchemaUtil {
     for (Column outer : left.getColumns()) {
       for (Column inner : right.getColumns()) {
         if (outer.getColumnName().equals(inner.getColumnName()) &&
-            outer.getDataType() == inner.getDataType()) {
+            outer.getDataType().equals(inner.getDataType())) {
           common.addColumn(outer.getColumnName(), outer.getDataType());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java
index be4a595..8b95816 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java
@@ -26,15 +26,14 @@ import org.apache.commons.logging.LogFactory;
 import tajo.catalog.Column;
 import tajo.catalog.Schema;
 import tajo.catalog.SortSpec;
-import tajo.catalog.proto.CatalogProtos;
 import tajo.catalog.statistics.ColumnStat;
-import tajo.datum.*;
+import tajo.datum.Datum;
+import tajo.datum.DatumFactory;
 import tajo.engine.planner.PlannerUtil;
 import tajo.storage.RowStoreUtil;
 import tajo.storage.Tuple;
 import tajo.storage.TupleRange;
 import tajo.storage.VTuple;
-import tajo.util.Bytes;
 import tajo.worker.dataserver.HttpUtil;
 
 import java.io.UnsupportedEncodingException;
@@ -63,29 +62,29 @@ public class TupleUtil {
     long columnCard;
     for (int i = 0; i < schema.getColumnNum(); i++) {
       col = schema.getColumn(i);
-      switch (col.getDataType()) {
+      switch (col.getDataType().getType()) {
         case CHAR:
           columnCard = end.get(i).asChar() - start.get(i).asChar();
           break;
-        case BYTE:
+        case BIT:
           columnCard = end.get(i).asByte() - start.get(i).asByte();
           break;
-        case SHORT:
-          columnCard = end.get(i).asShort() - start.get(i).asShort();
+        case INT2:
+          columnCard = end.get(i).asInt2() - start.get(i).asInt2();
           break;
-        case INT:
-          columnCard = end.get(i).asInt() - start.get(i).asInt();
+        case INT4:
+          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
           break;
-        case LONG:
-          columnCard = end.get(i).asLong() - start.get(i).asLong();
+        case INT8:
+          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
           break;
-        case FLOAT:
-          columnCard = end.get(i).asInt() - start.get(i).asInt();
+        case FLOAT4:
+          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
           break;
-        case DOUBLE:
-          columnCard = end.get(i).asLong() - start.get(i).asLong();
+        case FLOAT8:
+          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
           break;
-        case STRING:
+        case TEXT:
           columnCard = end.get(i).asChars().charAt(0) - start.get(i).asChars().charAt(0);
           break;
         default:
@@ -113,7 +112,7 @@ public class TupleUtil {
     for (int i = 0; i < schema.getColumnNum(); i++) {
       col = schema.getColumn(i);
       prevValues[i] = start.get(i);
-      switch (col.getDataType()) {
+      switch (col.getDataType().getType()) {
         case CHAR:
           int sChar = start.get(i).asChar();
           int eChar = end.get(i).asChar();
@@ -123,8 +122,8 @@ public class TupleUtil {
           } else {
             rangeChar = 1;
           }
-          term[i] = DatumFactory.createInt(rangeChar);
-        case BYTE:
+          term[i] = DatumFactory.createInt4(rangeChar);
+        case BIT:
           byte sByte = start.get(i).asByte();
           byte eByte = end.get(i).asByte();
           int rangeByte;
@@ -133,68 +132,68 @@ public class TupleUtil {
           } else {
             rangeByte = 1;
           }
-          term[i] = DatumFactory.createByte((byte)rangeByte);
+          term[i] = DatumFactory.createBit((byte) rangeByte);
           break;
 
-        case SHORT:
-          short sShort = start.get(i).asShort();
-          short eShort = end.get(i).asShort();
+        case INT2:
+          short sShort = start.get(i).asInt2();
+          short eShort = end.get(i).asInt2();
           int rangeShort;
           if ((eShort - sShort) > partNum) {
             rangeShort = (eShort - sShort) / partNum;
           } else {
             rangeShort = 1;
           }
-          term[i] = DatumFactory.createShort((short) rangeShort);
+          term[i] = DatumFactory.createInt2((short) rangeShort);
           break;
 
-        case INT:
-          int sInt = start.get(i).asInt();
-          int eInt = end.get(i).asInt();
+        case INT4:
+          int sInt = start.get(i).asInt4();
+          int eInt = end.get(i).asInt4();
           int rangeInt;
           if ((eInt - sInt) > partNum) {
             rangeInt = (eInt - sInt) / partNum;
           } else {
             rangeInt = 1;
           }
-          term[i] = DatumFactory.createInt(rangeInt);
+          term[i] = DatumFactory.createInt4(rangeInt);
           break;
 
-        case LONG:
-          long sLong = start.get(i).asLong();
-          long eLong = end.get(i).asLong();
+        case INT8:
+          long sLong = start.get(i).asInt8();
+          long eLong = end.get(i).asInt8();
           long rangeLong;
           if ((eLong - sLong) > partNum) {
             rangeLong = ((eLong - sLong) / partNum);
           } else {
             rangeLong = 1;
           }
-          term[i] = DatumFactory.createLong(rangeLong);
+          term[i] = DatumFactory.createInt8(rangeLong);
           break;
 
-        case FLOAT:
-          float sFloat = start.get(i).asFloat();
-          float eFloat = end.get(i).asFloat();
+        case FLOAT4:
+          float sFloat = start.get(i).asFloat4();
+          float eFloat = end.get(i).asFloat4();
           float rangeFloat;
           if ((eFloat - sFloat) > partNum) {
             rangeFloat = ((eFloat - sFloat) / partNum);
           } else {
             rangeFloat = 1;
           }
-          term[i] = DatumFactory.createFloat(rangeFloat);
+          term[i] = DatumFactory.createFloat4(rangeFloat);
           break;
-        case DOUBLE:
-          double sDouble = start.get(i).asDouble();
-          double eDouble = end.get(i).asDouble();
+        case FLOAT8:
+          double sDouble = start.get(i).asFloat8();
+          double eDouble = end.get(i).asFloat8();
           double rangeDouble;
           if ((eDouble - sDouble) > partNum) {
             rangeDouble = ((eDouble - sDouble) / partNum);
           } else {
             rangeDouble = 1;
           }
-          term[i] = DatumFactory.createDouble(rangeDouble);
+          term[i] = DatumFactory.createFloat8(rangeDouble);
           break;
-        case STRING:
+        case TEXT:
           char sChars = start.get(i).asChars().charAt(0);
           char eChars = end.get(i).asChars().charAt(0);
           int rangeString;
@@ -203,11 +202,11 @@ public class TupleUtil {
           } else {
             rangeString = 1;
           }
-          term[i] = DatumFactory.createString(((char)rangeString) + "");
+          term[i] = DatumFactory.createText(((char) rangeString) + "");
           break;
-        case IPv4:
+        case INET4:
           throw new UnsupportedOperationException();
-        case BYTES:
+        case BLOB:
           throw new UnsupportedOperationException();
         default:
           throw new UnsupportedOperationException();
@@ -220,7 +219,7 @@ public class TupleUtil {
       for (int i = 0; i < schema.getColumnNum(); i++) {
         col = schema.getColumn(i);
         sTuple.put(i, prevValues[i]);
-        switch (col.getDataType()) {
+        switch (col.getDataType().getType()) {
           case CHAR:
             char endChar = (char) (prevValues[i].asChar() + term[i].asChar());
             if (endChar > end.get(i).asByte()) {
@@ -230,80 +229,80 @@ public class TupleUtil {
             }
             prevValues[i] = DatumFactory.createChar(endChar);
             break;
-          case BYTE:
+          case BIT:
             byte endByte = (byte) (prevValues[i].asByte() + term[i].asByte());
             if (endByte > end.get(i).asByte()) {
               eTuple.put(i, end.get(i));
             } else {
-              eTuple.put(i, DatumFactory.createByte(endByte));
+              eTuple.put(i, DatumFactory.createBit(endByte));
             }
-            prevValues[i] = DatumFactory.createByte(endByte);
+            prevValues[i] = DatumFactory.createBit(endByte);
             break;
-          case SHORT:
-            int endShort = (short) (prevValues[i].asShort() + term[i].asShort());
-            if (endShort > end.get(i).asShort()) {
+          case INT2:
+            int endShort = (short) (prevValues[i].asInt2() + term[i].asInt2());
+            if (endShort > end.get(i).asInt2()) {
               eTuple.put(i, end.get(i));
             } else {
               // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createShort((short) endShort));
+              eTuple.put(i, DatumFactory.createInt2((short) endShort));
             }
-            prevValues[i] = DatumFactory.createShort((short) endShort);
+            prevValues[i] = DatumFactory.createInt2((short) endShort);
             break;
-          case INT:
-            int endInt = (prevValues[i].asInt() + term[i].asInt());
-            if (endInt > end.get(i).asInt()) {
+          case INT4:
+            int endInt = (prevValues[i].asInt4() + term[i].asInt4());
+            if (endInt > end.get(i).asInt4()) {
               eTuple.put(i, end.get(i));
             } else {
               // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt(endInt));
+              eTuple.put(i, DatumFactory.createInt4(endInt));
             }
-            prevValues[i] = DatumFactory.createInt(endInt);
+            prevValues[i] = DatumFactory.createInt4(endInt);
             break;
 
-          case LONG:
-            long endLong = (prevValues[i].asLong() + term[i].asLong());
-            if (endLong > end.get(i).asLong()) {
+          case INT8:
+            long endLong = (prevValues[i].asInt8() + term[i].asInt8());
+            if (endLong > end.get(i).asInt8()) {
               eTuple.put(i, end.get(i));
             } else {
               // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createLong(endLong));
+              eTuple.put(i, DatumFactory.createInt8(endLong));
             }
-            prevValues[i] = DatumFactory.createLong(endLong);
+            prevValues[i] = DatumFactory.createInt8(endLong);
             break;
 
-          case FLOAT:
-            float endFloat = (prevValues[i].asFloat() + term[i].asFloat());
-            if (endFloat > end.get(i).asFloat()) {
+          case FLOAT4:
+            float endFloat = (prevValues[i].asFloat4() + term[i].asFloat4());
+            if (endFloat > end.get(i).asFloat4()) {
               eTuple.put(i, end.get(i));
             } else {
               // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createFloat(endFloat));
+              eTuple.put(i, DatumFactory.createFloat4(endFloat));
             }
-            prevValues[i] = DatumFactory.createFloat(endFloat);
+            prevValues[i] = DatumFactory.createFloat4(endFloat);
             break;
-          case DOUBLE:
-            double endDouble = (prevValues[i].asDouble() + term[i].asDouble());
-            if (endDouble > end.get(i).asDouble()) {
+          case FLOAT8:
+            double endDouble = (prevValues[i].asFloat8() + term[i].asFloat8());
+            if (endDouble > end.get(i).asFloat8()) {
               eTuple.put(i, end.get(i));
             } else {
               // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createDouble(endDouble));
+              eTuple.put(i, DatumFactory.createFloat8(endDouble));
             }
-            prevValues[i] = DatumFactory.createDouble(endDouble);
+            prevValues[i] = DatumFactory.createFloat8(endDouble);
             break;
-          case STRING:
+          case TEXT:
             String endString = ((char)(prevValues[i].asChars().charAt(0) + term[i].asChars().charAt(0))) + "";
             if (endString.charAt(0) > end.get(i).asChars().charAt(0)) {
               eTuple.put(i, end.get(i));
             } else {
               // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createString(endString));
+              eTuple.put(i, DatumFactory.createText(endString));
             }
-            prevValues[i] = DatumFactory.createString(endString);
+            prevValues[i] = DatumFactory.createText(endString);
             break;
-          case IPv4:
+          case INET4:
             throw new UnsupportedOperationException();
-          case BYTES:
+          case BLOB:
             throw new UnsupportedOperationException();
           default:
             throw new UnsupportedOperationException();
@@ -386,64 +385,4 @@ public class TupleUtil {
     }
     return new TupleRange(target, startTuple, endTuple);
   }
-
-  public static Datum createFromBytes(CatalogProtos.DataType type, byte [] bytes) {
-    switch (type) {
-      case BOOLEAN:
-        return new BoolDatum(bytes);
-      case BYTE:
-        return new ByteDatum(bytes);
-      case CHAR:
-        return new CharDatum(bytes);
-      case SHORT:
-        return new ShortDatum(bytes);
-      case INT:
-        return new IntDatum(bytes);
-      case LONG:
-        return new LongDatum(bytes);
-      case FLOAT:
-        return new FloatDatum(bytes);
-      case DOUBLE:
-        return new DoubleDatum(bytes);
-      case STRING:
-        return new StringDatum(bytes);
-      case IPv4:
-        return new IPv4Datum(bytes);
-      default: throw new UnsupportedOperationException(type + " is not supported yet");
-    }
-  }
-
-  private final static byte [] TRUE_BYTES = new byte[] {(byte)1};
-  private final static byte [] FALSE_BYTES = new byte[] {(byte)0};
-
-  public static byte [] toBytes(CatalogProtos.DataType type, Datum datum) {
-    switch (type) {
-      case BOOLEAN:
-        if (datum.asBool()) {
-          return TRUE_BYTES;
-        } else {
-          return FALSE_BYTES;
-        }
-      case BYTE:
-      case CHAR:
-        return new byte[] {datum.asByte()};
-
-      case SHORT:
-        return Bytes.toBytes(datum.asShort());
-      case INT:
-        return Bytes.toBytes(datum.asInt());
-      case LONG:
-        return Bytes.toBytes(datum.asLong());
-      case FLOAT:
-        return Bytes.toBytes(datum.asFloat());
-      case DOUBLE:
-        return Bytes.toBytes(datum.asDouble());
-      case STRING:
-        return Bytes.toBytes(datum.asChars());
-      case IPv4:
-        return datum.asByteArray();
-
-      default: throw new UnsupportedOperationException(type + " is not supported yet");
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java
index 255e500..080dbdd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.Records;
 import tajo.QueryConf;
 import tajo.QueryId;
-import tajo.catalog.TCatUtil;
+import tajo.catalog.CatalogUtil;
 import tajo.catalog.TableDesc;
 import tajo.catalog.TableMeta;
 import tajo.catalog.statistics.TableStat;
@@ -115,10 +115,10 @@ public class GlobalEngine extends AbstractService {
     CreateTableNode createTable = (CreateTableNode) root.getSubNode();
     TableMeta meta;
     if (createTable.hasOptions()) {
-      meta = TCatUtil.newTableMeta(createTable.getSchema(),
+      meta = CatalogUtil.newTableMeta(createTable.getSchema(),
           createTable.getStorageType(), createTable.getOptions());
     } else {
-      meta = TCatUtil.newTableMeta(createTable.getSchema(),
+      meta = CatalogUtil.newTableMeta(createTable.getSchema(),
           createTable.getStorageType());
     }
 
@@ -138,7 +138,7 @@ public class GlobalEngine extends AbstractService {
     meta.setStat(stat);
 
     StorageUtil.writeTableMeta(context.getConf(), createTable.getPath(), meta);
-    TableDesc desc = TCatUtil.newTableDesc(createTable.getTableName(), meta,
+    TableDesc desc = CatalogUtil.newTableDesc(createTable.getTableName(), meta,
         createTable.getPath());
     context.getCatalog().addTable(desc);
     return desc.getId();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
index b9fb587..376f15d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
@@ -482,7 +482,7 @@ public class GlobalPlanner {
     
     if (union.getOuterNode().getType() == ExprType.STORE) {
       outerStore = (StoreTableNode) union.getOuterNode();
-      TableMeta outerMeta = TCatUtil.newTableMeta(outerStore.getOutSchema(),
+      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
           StoreType.CSV);
       insertOuterScan(union, outerStore.getTableName(), outerMeta);
       prev = convertMap.get(outerStore);
@@ -498,7 +498,7 @@ public class GlobalPlanner {
     
     if (union.getInnerNode().getType() == ExprType.STORE) {
       innerStore = (StoreTableNode) union.getInnerNode();
-      TableMeta innerMeta = TCatUtil.newTableMeta(innerStore.getOutSchema(),
+      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
           StoreType.CSV);
       insertInnerScan(union, innerStore.getTableName(), innerMeta);
       prev = convertMap.get(innerStore);
@@ -597,7 +597,7 @@ public class GlobalPlanner {
     // outer
     if (join.getOuterNode().getType() == ExprType.STORE) {
       outerStore = (StoreTableNode) join.getOuterNode();
-      TableMeta outerMeta = TCatUtil.newTableMeta(outerStore.getOutSchema(),
+      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
           StoreType.CSV);
       insertOuterScan(join, outerStore.getTableName(), outerMeta);
       prev = convertMap.get(outerStore);
@@ -617,7 +617,7 @@ public class GlobalPlanner {
     // inner
     if (join.getInnerNode().getType() == ExprType.STORE) {
       innerStore = (StoreTableNode) join.getInnerNode();
-      TableMeta innerMeta = TCatUtil.newTableMeta(innerStore.getOutSchema(),
+      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
           StoreType.CSV);
       insertInnerScan(join, innerStore.getTableName(), innerMeta);
       prev = convertMap.get(innerStore);
@@ -654,7 +654,7 @@ public class GlobalPlanner {
     
     if (union.getOuterNode().getType() == ExprType.STORE) {
       store = (StoreTableNode) union.getOuterNode();
-      meta = TCatUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
+      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
       insertOuterScan(union, store.getTableName(), meta);
       prev = convertMap.get(store);
       if (prev != null) {
@@ -673,7 +673,7 @@ public class GlobalPlanner {
     
     if (union.getInnerNode().getType() == ExprType.STORE) {
       store = (StoreTableNode) union.getInnerNode();
-      meta = TCatUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
+      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
       insertInnerScan(union, store.getTableName(), meta);
       prev = convertMap.get(store);
       if (prev != null) {
@@ -762,7 +762,7 @@ public class GlobalPlanner {
   
   private LogicalNode insertOuterScan(BinaryNode parent, String tableId,
       TableMeta meta) throws IOException {
-    TableDesc desc = TCatUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
+    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
     ScanNode scan = new ScanNode(new FromTable(desc));
     scan.setLocal(true);
     scan.setInSchema(meta.getSchema());
@@ -773,7 +773,7 @@ public class GlobalPlanner {
   
   private LogicalNode insertInnerScan(BinaryNode parent, String tableId, 
       TableMeta meta) throws IOException {
-    TableDesc desc = TCatUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
+    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
     ScanNode scan = new ScanNode(new FromTable(desc));
     scan.setLocal(true);
     scan.setInSchema(meta.getSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java
index 1ecfa0e..089da1b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java
@@ -173,8 +173,8 @@ public class GlobalPlannerUtils {
   public static ScanNode newScanPlan(Schema inputSchema,
                                      String inputTableId,
                                      Path inputPath) {
-    TableMeta meta = TCatUtil.newTableMeta(inputSchema, StoreType.CSV);
-    TableDesc desc = TCatUtil.newTableDesc(inputTableId, meta, inputPath);
+    TableMeta meta = CatalogUtil.newTableMeta(inputSchema, StoreType.CSV);
+    TableDesc desc = CatalogUtil.newTableDesc(inputTableId, meta, inputPath);
     ScanNode newScan = new ScanNode(new QueryBlock.FromTable(desc));
     newScan.setInSchema(desc.getMeta().getSchema());
     newScan.setOutSchema(desc.getMeta().getSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
index d740cbd..98fb754 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
@@ -31,10 +31,7 @@ import tajo.QueryConf;
 import tajo.QueryId;
 import tajo.SubQueryId;
 import tajo.TajoProtos.QueryState;
-import tajo.catalog.TCatUtil;
-import tajo.catalog.TableDesc;
-import tajo.catalog.TableDescImpl;
-import tajo.catalog.TableMeta;
+import tajo.catalog.*;
 import tajo.catalog.proto.CatalogProtos.StoreType;
 import tajo.engine.json.GsonCreator;
 import tajo.engine.planner.global.MasterPlan;
@@ -418,7 +415,7 @@ public class Query implements EventHandler<QueryEvent> {
       if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
         meta = sm.getTableMeta(indexPath);
       } else {
-        meta = TCatUtil
+        meta = CatalogUtil
             .newTableMeta(execBlock.getOutputSchema(), StoreType.CSV);
       }
       String indexName = IndexUtil.getIndexName(index.getTableName(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
index 51aff94..622c4f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
@@ -76,7 +76,7 @@ public class Repartitioner {
 
       if (scans[i].isLocal()) { // it only requires a dummy fragment.
         fragments[i] = new Fragment(scans[i].getTableId(), tablePath,
-            TCatUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV),
+            CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV),
             0, 0, null);
       } else {
         fragments[i] = subQuery.getStorageManager().getSplits(scans[i].getTableId(),
@@ -291,7 +291,7 @@ public class Repartitioner {
     TupleRange [] ranges = partitioner.partition(determinedTaskNum);
 
     Fragment dummyFragment = new Fragment(scan.getTableId(), tablePath,
-        TCatUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
+        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
         0, 0, null);
 
     List<String> basicFetchURIs = new ArrayList<String>();
@@ -389,7 +389,7 @@ public class Repartitioner {
     }
 
     Fragment frag = new Fragment(scan.getTableId(), tablePath,
-        TCatUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
+        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
         0, 0, null);
 
     Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index 986ecaf..d965982 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -35,7 +35,7 @@ import tajo.QueryIdFactory;
 import tajo.QueryUnitId;
 import tajo.SubQueryId;
 import tajo.catalog.CatalogService;
-import tajo.catalog.TCatUtil;
+import tajo.catalog.CatalogUtil;
 import tajo.catalog.TableDesc;
 import tajo.catalog.TableMeta;
 import tajo.catalog.statistics.ColumnStat;
@@ -358,10 +358,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private static TableMeta toTableMeta(StoreTableNode store) {
     if (store.hasOptions()) {
-      return TCatUtil.newTableMeta(store.getOutSchema(),
+      return CatalogUtil.newTableMeta(store.getOutSchema(),
           store.getStorageType(), store.getOptions());
     } else {
-      return TCatUtil.newTableMeta(store.getOutSchema(),
+      return CatalogUtil.newTableMeta(store.getOutSchema(),
           store.getStorageType());
     }
   }
@@ -766,4 +766,4 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
index 66f4446..82b1445 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
@@ -39,12 +39,9 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import tajo.QueryId;
 import tajo.QueryIdFactory;
 import tajo.TajoConstants;
-import tajo.catalog.CatalogServer;
-import tajo.catalog.CatalogService;
-import tajo.catalog.FunctionDesc;
-import tajo.catalog.LocalCatalog;
-import tajo.catalog.proto.CatalogProtos.DataType;
+import tajo.catalog.*;
 import tajo.catalog.proto.CatalogProtos.FunctionType;
+import tajo.common.TajoDataTypes.Type;
 import tajo.conf.TajoConf;
 import tajo.conf.TajoConf.ConfVars;
 import tajo.engine.MasterWorkerProtos.TaskStatusProto;
@@ -169,93 +166,93 @@ public class TajoMaster extends CompositeService {
 
     // Sum
     sqlFuncs.add(new FunctionDesc("sum", SumInt.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.INT},
-        new DataType[] {DataType.INT}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("sum", SumLong.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.LONG},
-        new DataType[] {DataType.LONG}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("sum", SumFloat.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.FLOAT},
-        new DataType[] {DataType.FLOAT}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
     sqlFuncs.add(new FunctionDesc("sum", SumDouble.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.DOUBLE},
-        new DataType[] {DataType.DOUBLE}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
 
     // Max
     sqlFuncs.add(new FunctionDesc("max", MaxInt.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.INT},
-        new DataType[] {DataType.INT}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("max", MaxLong.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.LONG},
-        new DataType[] {DataType.LONG}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("max", MaxFloat.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.FLOAT},
-        new DataType[] {DataType.FLOAT}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
     sqlFuncs.add(new FunctionDesc("max", MaxDouble.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.DOUBLE},
-        new DataType[] {DataType.DOUBLE}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
 
     // Min
     sqlFuncs.add(new FunctionDesc("min", MinInt.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.INT},
-        new DataType[] {DataType.INT}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("min", MinLong.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.LONG},
-        new DataType[] {DataType.LONG}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("min", MinFloat.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.FLOAT},
-        new DataType[] {DataType.FLOAT }));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4 )));
     sqlFuncs.add(new FunctionDesc("min", MinDouble.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.DOUBLE},
-        new DataType[] {DataType.DOUBLE}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
     sqlFuncs.add(new FunctionDesc("min", MinString.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.STRING},
-        new DataType[] {DataType.STRING}));
+        CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
+        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
 
     // AVG
     sqlFuncs.add(new FunctionDesc("avg", AvgInt.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.FLOAT},
-        new DataType[] {DataType.INT}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
     sqlFuncs.add(new FunctionDesc("avg", AvgLong.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.DOUBLE},
-        new DataType[] {DataType.LONG}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
     sqlFuncs.add(new FunctionDesc("avg", AvgFloat.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.FLOAT},
-        new DataType[] {DataType.FLOAT}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
     sqlFuncs.add(new FunctionDesc("avg", AvgDouble.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.DOUBLE},
-        new DataType[] {DataType.DOUBLE}));
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
 
     // Count
     sqlFuncs.add(new FunctionDesc("count", CountValue.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.LONG},
-        new DataType[] {DataType.ANY}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.ANY)));
     sqlFuncs.add(new FunctionDesc("count", CountRows.class, FunctionType.AGGREGATION,
-        new DataType[] {DataType.LONG},
-        new DataType[] {}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen()));
 
     // GeoIP
     sqlFuncs.add(new FunctionDesc("in_country", InCountry.class, FunctionType.GENERAL,
-        new DataType[] {DataType.BOOLEAN},
-        new DataType[] {DataType.STRING, DataType.STRING}));
+        CatalogUtil.newDataTypesWithoutLen(Type.BOOLEAN),
+        CatalogUtil.newDataTypesWithoutLen(Type.TEXT, Type.TEXT)));
     sqlFuncs.add(new FunctionDesc("country", Country.class, FunctionType.GENERAL,
-        new DataType[] {DataType.STRING},
-        new DataType[] {DataType.STRING}));
+        CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
+        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
 
     // Date
     sqlFuncs.add(new FunctionDesc("date", Date.class, FunctionType.GENERAL,
-        new DataType[] {DataType.LONG},
-        new DataType[] {DataType.STRING}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
 
     // Today
     sqlFuncs.add(new FunctionDesc("today", Date.class, FunctionType.GENERAL,
-        new DataType[] {DataType.LONG},
-        new DataType[] {}));
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen()));
 
     sqlFuncs.add(
         new FunctionDesc("random", RandomInt.class, FunctionType.GENERAL,
-            new DataType[]{DataType.INT},
-            new DataType[]{DataType.INT}));
+            CatalogUtil.newDataTypesWithoutLen(Type.INT4),
+            CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
 
     return sqlFuncs;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
index dd1bf69..36b33c1 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
@@ -22,33 +22,12 @@ option optimize_for = SPEED;
 option java_generic_services = false;
 option java_generate_equals_and_hash = true;
 
-enum DataType {
-    BOOLEAN = 0;
-    BYTE = 1;
-    CHAR = 2;
-    BYTES = 3;
-    SHORT = 4;
-    INT = 5;
-    BIGINT = 6;
-    LONG = 7;
-    FLOAT = 8;
-    DOUBLE = 9;
-    BIGDECIMAL = 10;
-    STRING = 11;
-    DATE = 12;
-    IPv4 = 13;
-    IPv6 = 14;
-    NULL = 15;
-    ANY = 16;
-    ALL = 17;
-    ARRAY = 18;
-    STRING2 = 19;
-}
+import "DataTypes.proto";
 
 enum StoreType {
-	MEM = 0;
-	CSV = 1;
-	RAW = 2;
+  MEM = 0;
+  CSV = 1;
+  RAW = 2;
   RCFILE = 3;
   ROWFILE = 4;
   HCFILE = 5;
@@ -56,147 +35,147 @@ enum StoreType {
 }
 
 enum OrderType {
-    ORDER_NONE = 0;
-    ASC = 1;
-    DSC = 2;
+  ORDER_NONE = 0;
+  ASC = 1;
+  DSC = 2;
 }
 
 enum CompressType {
-    COMP_NONE = 0;
-    NULL_SUPPRESS = 1;
-    RUN_LENGTH = 2;
-    BIT_VECTOR = 3;
-    DICTIONARY = 4;
-    SNAPPY = 5;
-    LZ = 6;
+  COMP_NONE = 0;
+  NULL_SUPPRESS = 1;
+  RUN_LENGTH = 2;
+  BIT_VECTOR = 3;
+  DICTIONARY = 4;
+  SNAPPY = 5;
+  LZ = 6;
 }
 
 message ColumnMetaProto {
-    required DataType dataType = 1;
-    required bool compressed = 2;
-    required bool sorted = 3;
-    required bool contiguous = 4;
-    required StoreType storeType = 5;
-    required CompressType compType = 6;
-    required int64 startRid = 7;
-    required int32 recordNum = 8;
-    required int32 offsetToIndex = 9;
+  required DataType dataType = 1;
+  required bool compressed = 2;
+  required bool sorted = 3;
+  required bool contiguous = 4;
+  required StoreType storeType = 5;
+  required CompressType compType = 6;
+  required int64 startRid = 7;
+  required int32 recordNum = 8;
+  required int32 offsetToIndex = 9;
 }
 
 message ColumnProto {
-	required string columnName = 1;
-	required DataType dataType = 2;
+  required string columnName = 1;
+  required DataType dataType = 2;
 }
 
 message SchemaProto {
-	repeated ColumnProto fields = 1;
+  repeated ColumnProto fields = 1;
 }
 
 message KeyValueProto {
-	required string key = 1;
-	required string value = 2;
+  required string key = 1;
+  required string value = 2;
 }
 
 message KeyValueSetProto {
-	repeated KeyValueProto keyval = 1;
+  repeated KeyValueProto keyval = 1;
 }
 
 message TabletProto {
-	required string id = 1;
-	required string path = 2;
-	required int64 startOffset = 3;
-	required int64 length = 4;
-	required TableProto meta = 5;
-	optional TableStatProto stat = 6;
+  required string id = 1;
+  required string path = 2;
+  required int64 startOffset = 3;
+  required int64 length = 4;
+  required TableProto meta = 5;
+  optional TableStatProto stat = 6;
   optional bool distCached = 7 [default = false];
 }
 
 message TableProto {
-    required SchemaProto schema = 1;
-    required StoreType storeType = 2;
-    required KeyValueSetProto params = 3;
-    optional TableStatProto stat = 4;
+  required SchemaProto schema = 1;
+  required StoreType storeType = 2;
+  required KeyValueSetProto params = 3;
+  optional TableStatProto stat = 4;
 }
 
 message TableDescProto {
-	required string id = 1;
-	required string path = 2;
-	required TableProto meta = 3;
+  required string id = 1;
+  required string path = 2;
+  required TableProto meta = 3;
 }
 
 enum FunctionType {
-	GENERAL = 0;
-	AGGREGATION = 1;
+  GENERAL = 0;
+  AGGREGATION = 1;
 }
 
 message FunctionDescProto {
-	required string signature = 1;
-	required string className = 2;
-	required FunctionType type = 3;
-	repeated DataType parameterTypes = 4;
-	required DataType returnType = 5;
+  required string signature = 1;
+  required string className = 2;
+  required FunctionType type = 3;
+  repeated DataType parameterTypes = 4;
+  required DataType returnType = 5;
 }
 
 message IndexDescProto {
-    required string name = 1;
-    required string tableId = 2;
-    required ColumnProto column = 3;
-    required IndexMethod indexMethod = 4;
-    optional bool isUnique = 5 [default = false];
-    optional bool isClustered = 6 [default = false];
-    optional bool isAscending = 7 [default = false];
+  required string name = 1;
+  required string tableId = 2;
+  required ColumnProto column = 3;
+  required IndexMethod indexMethod = 4;
+  optional bool isUnique = 5 [default = false];
+  optional bool isClustered = 6 [default = false];
+  optional bool isAscending = 7 [default = false];
 }
 
 enum IndexMethod {
-    TWO_LEVEL_BIN_TREE = 0;
-    BTREE = 1;
-    HASH = 2;
-    BITMAP = 3;
+  TWO_LEVEL_BIN_TREE = 0;
+  BTREE = 1;
+  HASH = 2;
+  BITMAP = 3;
 }
 
 message GetAllTableNamesResponse {
-    repeated string tableName = 1;
+  repeated string tableName = 1;
 }
 
 message GetIndexRequest {
-    required string tableName = 1;
-    required string columnName = 2;
+  required string tableName = 1;
+  required string columnName = 2;
 }
 
 message GetFunctionsResponse {
-	repeated FunctionDescProto functionDesc = 1;
+  repeated FunctionDescProto functionDesc = 1;
 }
 
 message UnregisterFunctionRequest {
-	required string signature = 1;
-	repeated DataType parameterTypes = 2;
+  required string signature = 1;
+  repeated DataType parameterTypes = 2;
 }
 
 message GetFunctionMetaRequest {
-	required string signature = 1;
-	repeated DataType parameterTypes = 2;
+  required string signature = 1;
+  repeated DataType parameterTypes = 2;
 }
 
 message ContainFunctionRequest {
-	required string signature = 1;
-	repeated DataType parameterTypes = 2;
+  required string signature = 1;
+  repeated DataType parameterTypes = 2;
 }
 
 message TableStatProto {
-	required int64 numRows = 1;
-	required int64 numBytes = 2;
-	optional int32 numBlocks = 3;
-	optional int32 numPartitions = 4;
-	optional int64 avgRows = 5;
-	repeated ColumnStatProto colStat = 6;
+  required int64 numRows = 1;
+  required int64 numBytes = 2;
+  optional int32 numBlocks = 3;
+  optional int32 numPartitions = 4;
+  optional int64 avgRows = 5;
+  repeated ColumnStatProto colStat = 6;
 }
 
 message ColumnStatProto {
-    required ColumnProto column = 1;
-    optional int64 numDistVal = 2;
-    optional int64 numNulls = 3;
-    optional bytes minValue = 4;
-    optional bytes maxValue = 5;
+  required ColumnProto column = 1;
+  optional int64 numDistVal = 2;
+  optional int64 numNulls = 3;
+  optional bytes minValue = 4;
+  optional bytes maxValue = 5;
 }
 
 enum StatType {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/test/java/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/tajo/BackendTestingUtil.java
index 1db61c2..1b2e60b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/BackendTestingUtil.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.DataType;
 import tajo.catalog.proto.CatalogProtos.StoreType;
+import tajo.common.TajoDataTypes.Type;
 import tajo.conf.TajoConf;
 import tajo.datum.DatumFactory;
 import tajo.engine.parser.QueryAnalyzer;
@@ -51,9 +51,9 @@ public class BackendTestingUtil {
 
 	static {
     mockupSchema = new Schema();
-    mockupSchema.addColumn("deptname", DataType.STRING);
-    mockupSchema.addColumn("score", DataType.INT);
-    mockupMeta = TCatUtil.newTableMeta(mockupSchema, StoreType.CSV);
+    mockupSchema.addColumn("deptname", Type.TEXT);
+    mockupSchema.addColumn("score", Type.INT4);
+    mockupMeta = CatalogUtil.newTableMeta(mockupSchema, StoreType.CSV);
 	}
 
   public static void writeTmpTable(TajoConf conf, Path path,
@@ -81,8 +81,8 @@ public class BackendTestingUtil {
     for (int i = 0; i < tupleNum; i++) {
       tuple = new VTuple(2);
       String key = "test" + (i % deptSize);
-      tuple.put(0, DatumFactory.createString(key));
-      tuple.put(1, DatumFactory.createInt(i + 1));
+      tuple.put(0, DatumFactory.createText(key));
+      tuple.put(1, DatumFactory.createInt4(i + 1));
       appender.addTuple(tuple);
     }
     appender.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/test/java/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/tajo/LocalTajoTestingUtility.java
index 9ce3383..405eb11 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/LocalTajoTestingUtility.java
@@ -21,10 +21,7 @@ package tajo;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import tajo.catalog.Options;
-import tajo.catalog.Schema;
-import tajo.catalog.TCatUtil;
-import tajo.catalog.TableMeta;
+import tajo.catalog.*;
 import tajo.catalog.proto.CatalogProtos;
 import tajo.client.TajoClient;
 import tajo.conf.TajoConf;
@@ -59,7 +56,7 @@ public class LocalTajoTestingUtility {
       fs.mkdirs(dataPath);
       Path dfsPath = new Path(dataPath, localPath.getName());
       fs.copyFromLocalFile(localPath, dfsPath);
-      TableMeta meta = TCatUtil.newTableMeta(schemas[i],
+      TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
           CatalogProtos.StoreType.CSV, option);
       client.createTable(names[i], tablePath, meta);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/test/java/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/tajo/TajoTestingCluster.java
index 75fde3c..8cac708 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/TajoTestingCluster.java
@@ -425,7 +425,7 @@ public class TajoTestingCluster {
       dataDir.mkdirs();
       File tableFile = new File(dataDir, tableNames[i]);
       writeLines(tableFile, tables[i]);
-      TableMeta meta = TCatUtil
+      TableMeta meta = CatalogUtil
           .newTableMeta(schemas[i], CatalogProtos.StoreType.CSV, option);
       client.createTable(tableNames[i], new Path(tableDir.getAbsolutePath()), meta);
     }
@@ -457,7 +457,7 @@ public class TajoTestingCluster {
       fs.mkdirs(dataPath);
       Path dfsPath = new Path(dataPath, localPath.getName());
       fs.copyFromLocalFile(localPath, dfsPath);
-      TableMeta meta = TCatUtil.newTableMeta(schemas[i],
+      TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
           CatalogProtos.StoreType.CSV, option);
       client.createTable(names[i], tablePath, meta);
     }
@@ -492,7 +492,7 @@ public class TajoTestingCluster {
         out.write((tables[i][j]+"\n").getBytes());
       }
       out.close();
-      TableMeta meta = TCatUtil.newTableMeta(schemas[i],
+      TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
           CatalogProtos.StoreType.CSV, option);
       client.createTable(names[i], tablePath, meta);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c1c6f83e/tajo-core/tajo-core-backend/src/test/java/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/tajo/client/TestTajoClient.java
index ff7a292..afc75e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/client/TestTajoClient.java
@@ -89,7 +89,7 @@ public class TestTajoClient {
 
     assertFalse(tajo.existTable(tableName));
     String tql =
-        "create external table " + tableName + " (deptname string, score int) "
+        "create external table " + tableName + " (deptname text, score integer) "
             + "using csv location '" + tablePath + "'";
     tajo.updateQuery(tql);
     assertTrue(tajo.existTable(tableName));
@@ -118,7 +118,7 @@ public class TestTajoClient {
 
     assertFalse(tajo.existTable(tableName));
     String tql =
-        "create external table " + tableName + " (deptname string, score int) "
+        "create external table " + tableName + " (deptname text, score int4) "
             + "using csv location 'file:///tmp/" + tableName + "'";
     tajo.executeQueryAndGetResult(tql);
     assertTrue(tajo.existTable(tableName));