You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/02 04:58:59 UTC

[4/6] tajo git commit: TAJO-1450: Encapsulate Datum in Tuple.

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
index f4cb7cb..209b3cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
@@ -52,14 +52,15 @@ public class Md5 extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if (params.isBlankOrNull(0)) {
+      return NullDatum.get();
+    }
 
     try {
-        MessageDigest md = MessageDigest.getInstance("MD5");
-        return DatumFactory.createText(new String(Hex.encodeHex(md.digest(datum.asByteArray()))));
+      MessageDigest md = MessageDigest.getInstance("MD5");
+      return DatumFactory.createText(new String(Hex.encodeHex(md.digest(params.getBytes(0)))));
     } catch (NoSuchAlgorithmException e){
-        return NullDatum.get();
+      return NullDatum.get();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
index cb5af15..6d8bffb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
@@ -51,11 +51,10 @@ public class OctetLength  extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) {
+    if (params.isBlankOrNull(0)) {
       return NullDatum.get();
     }
 
-    return DatumFactory.createInt4(datum.asByteArray().length);
+    return DatumFactory.createInt4(params.getBytes(0).length);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
index aaef10d..c7bd7b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
@@ -56,12 +56,10 @@ public class QuoteIdent extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-
-    if(datum instanceof NullDatum) {
+    if (params.isBlankOrNull(0)) {
       return NullDatum.get();
     }
 
-    return DatumFactory.createText("\"" + datum.asChars() + "\"");
+    return DatumFactory.createText("\"" + params.getText(0) + "\"");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
index 03b9c25..ff4e4ba 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
@@ -66,13 +66,15 @@ public class RTrim extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if (params.isBlankOrNull(0)) {
+      return NullDatum.get();
+    }
 
+    String value = params.getText(0);
     if (!hasTrimCharacters) {
-      return DatumFactory.createText(StringUtils.stripEnd(datum.asChars(), null));
+      return DatumFactory.createText(StringUtils.stripEnd(value, null));
     } else {
-      return DatumFactory.createText(StringUtils.stripEnd(datum.asChars(), params.get(1).asChars()));
+      return DatumFactory.createText(StringUtils.stripEnd(value, params.getText(1)));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
index e3abeb5..6257c42 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
@@ -22,7 +22,6 @@ import com.google.gson.annotations.Expose;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.BooleanDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
@@ -80,39 +79,33 @@ public class RegexpReplace extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum thisValue = params.get(0);
-    Datum thisPattern = params.get(1);
-    Datum thisReplacement = params.get(2);
-    boolean nullResult = isAlwaysNull
-        || thisValue instanceof NullDatum
-        || thisReplacement instanceof NullDatum
-        || thisPattern instanceof NullDatum;
+    if (isAlwaysNull || params.isBlankOrNull(0) || params.isBlankOrNull(1) || params.isBlankOrNull(2)) {
+      return NullDatum.get();
+    }
 
-    Pattern thisCompiled;
-    if (!nullResult) {
-      if (compiled != null) {
-        thisCompiled = compiled;
-      } else {
-        thisCompiled = Pattern.compile(thisPattern.asChars());
+    String value = params.getText(0);
+    String replacement = params.getText(2);
 
-        // if a regular expression pattern is a constant,
-        // it will be reused in every call
-        if (isPatternConstant) {
-          compiled = thisCompiled;
-        }
-      }
+    Pattern thisCompiled;
+    if (compiled != null) {
+      thisCompiled = compiled;
+    } else {
+      thisCompiled = Pattern.compile(params.getText(1));
 
-      Matcher matcher = thisCompiled.matcher(thisValue.asChars());
-      String replacement = thisReplacement.asChars();
-      StringBuffer sb = new StringBuffer();
-      while (matcher.find()) {
-        matcher.appendReplacement(sb, replacement);
+      // if a regular expression pattern is a constant,
+      // it will be reused in every call
+      if (isPatternConstant) {
+        compiled = thisCompiled;
       }
-      matcher.appendTail(sb);
+    }
 
-      return DatumFactory.createText(sb.toString());
-    } else {
-      return NullDatum.get();
+    Matcher matcher = thisCompiled.matcher(value);
+    StringBuffer sb = new StringBuffer();
+    while (matcher.find()) {
+      matcher.appendReplacement(sb, replacement);
     }
+    matcher.appendTail(sb);
+
+    return DatumFactory.createText(sb.toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
index c02ef66..339c4ea 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
@@ -61,12 +61,10 @@ public class Repeat extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if (params.isBlankOrNull(0) || params.isBlankOrNull(1)) {
+      return NullDatum.get();
+    }
 
-    Datum countDatum = params.get(1);
-    if(countDatum instanceof NullDatum) return NullDatum.get();
-
-    return DatumFactory.createText(repeat(datum.asChars(), countDatum.asInt4()));
+    return DatumFactory.createText(repeat(params.getText(0), params.getInt4(1)));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
index bf67294..44d2c49 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
@@ -50,9 +50,10 @@ public class Reverse extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if (params.isBlankOrNull(0)) {
+      return NullDatum.get();
+    }
 
-    return DatumFactory.createText(new StringBuffer(datum.asChars()).reverse().toString());
+    return DatumFactory.createText(new StringBuffer(params.getText(0)).reverse().toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
index 68af423..0d0138a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
@@ -64,15 +64,13 @@ public class Right extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
-
-    Datum sizeDatum = params.get(1);
-    if(sizeDatum instanceof NullDatum) return NullDatum.get();
+    if (params.isBlankOrNull(0) || params.isBlankOrNull(1)) {
+      return NullDatum.get();
+    }
 
-    String data = datum.asChars();
+    String data = params.getText(0);
     int length = data.length();
-    int size = sizeDatum.asInt4();
+    int size = params.getInt4(1);
 
     size = getSize(length, size);
     if (size == 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
index 1509118..e50575d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
@@ -68,25 +68,26 @@ public class Rpad extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    Datum lengthDatum = params.get(1);
-
-    if(datum instanceof NullDatum) return NullDatum.get();
-    if(lengthDatum instanceof NullDatum) return NullDatum.get();
-
-    Datum fillText=NullDatum.get();
-    if(hasFillCharacters) {
-      fillText=params.get(2);
+    if (params.isBlankOrNull(0) || params.isBlankOrNull(1)) {
+      return NullDatum.get();
     }
-    else {
-      fillText=DatumFactory.createText(" ");
+
+    String fillText;
+    if (hasFillCharacters) {
+      fillText = params.getText(2);
+    } else {
+      fillText = " ";
     }
 
-    int templen = lengthDatum.asInt4() - datum.asChars().length();
+    String input = params.getText(0);
+    int expected = params.getInt4(1);
 
-    if(templen<=0)
-      return DatumFactory.createText(datum.asChars().substring(0,lengthDatum.asInt4()));
+    int templen = expected - params.size(0);
+
+    if (templen <= 0) {
+      return DatumFactory.createText(input.substring(0, expected));
+    }
 
-    return DatumFactory.createText(StringUtils.rightPad(datum.asChars(), lengthDatum.asInt4(), fillText.asChars()));
+    return DatumFactory.createText(StringUtils.rightPad(input, expected, fillText));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/SplitPart.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/SplitPart.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/SplitPart.java
index 1c3be00..9604d18 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/SplitPart.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/SplitPart.java
@@ -54,15 +54,12 @@ public class SplitPart extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum text = params.get(0);
-    Datum part = params.get(2);
-
-    if (text.isNull() || part.isNull()) {
+    if (params.isBlankOrNull(0) || params.isBlankOrNull(2)) {
       return NullDatum.get();
     }
 
-    String [] split = StringUtils.splitByWholeSeparatorPreserveAllTokens(text.asChars(), params.get(1).asChars(), -1);
-    int idx = params.get(2).asInt4() - 1;
+    String [] split = StringUtils.splitByWholeSeparatorPreserveAllTokens(params.getText(0), params.getText(1), -1);
+    int idx = params.getInt4(2) - 1;
     if (split.length > idx) {
       return DatumFactory.createText(split[idx]);
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
index 9c3e4f1..3ff1a3b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
@@ -52,19 +52,12 @@ public class StrPos extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if (params.isBlankOrNull(0) || params.isBlankOrNull(1)) {
       return NullDatum.get();
     }
-
-    Datum substringDatum = params.get(1);
-    if(substringDatum instanceof NullDatum) {
-      return NullDatum.get();
-    }
-
-    String value = valueDatum.asChars();
-    String substring = substringDatum.asChars();
-    if(substring.length() == 0) {
+    String value = params.getText(0);
+    String substring = params.getText(1);
+    if (substring.length() == 0) {
       return DatumFactory.createInt4(1);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
index c332006..5bfcc50 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
@@ -54,19 +54,13 @@ public class StrPosb extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if (params.isBlankOrNull(0) || params.isBlankOrNull(1)) {
       return NullDatum.get();
     }
 
-    Datum substringDatum = params.get(1);
-    if(substringDatum instanceof NullDatum) {
-      return NullDatum.get();
-    }
-
-    String value = valueDatum.asChars();
-    String substring = substringDatum.asChars();
-    if(substring.length() == 0) {
+    String value = params.getText(0);
+    String substring = params.getText(1);
+    if (substring.length() == 0) {
       return DatumFactory.createInt4(1);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
index 7e4ddd6..4319853 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
@@ -56,39 +56,28 @@ public class Substr extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum valueDatum = params.get(0);
-    Datum fromDatum = params.get(1);
-    Datum countDatum = params.size() > 2 ? params.get(2) : null;
-
-    if(valueDatum instanceof NullDatum || fromDatum instanceof NullDatum || countDatum instanceof NullDatum) {
+    if (params.isBlankOrNull(0) || params.isBlankOrNull(1)) {
       return NullDatum.get();
     }
-
-    String value = valueDatum.asChars();
-    int from = fromDatum.asInt4();
-    int strLength = value.length();
-    int count;
-
-    if (countDatum == null) {
-      count = strLength;
-    } else {
-      count = (countDatum.asInt4() + from) - 1;
+    if (params.size() > 2 && params.isBlankOrNull(2)) {
+      return NullDatum.get();
     }
 
-    if (count > strLength) {
-      count = strLength;
-    }
+    String value = params.getText(0);
+    int start = params.getInt4(1) - 1;
+
+    int from = Math.max(0, start);
+    int length = params.size() > 2 ? params.getInt4(2) : -1;
 
-    if (from < 1) {
-      from = 0;
-    } else {
-      from --;
+    int to = value.length();
+    if (length >= 0) {
+      to = Math.min(start + length, to);
     }
 
-    if (from >= count) {
+    if (from >= to) {
       return DatumFactory.createText("");
     }
 
-    return DatumFactory.createText(value.substring(from, count));
+    return DatumFactory.createText(value.substring(from, to));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
index a9f85f3..a768e7f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
@@ -54,12 +54,11 @@ public class ToBin extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) {
+    if (params.isBlankOrNull(0)) {
       return NullDatum.get();
     }
 
-    return  DatumFactory.createText(Long.toBinaryString(datum.asInt8()));
+    return DatumFactory.createText(Long.toBinaryString(params.getInt8(0)));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java
index 5fed940..7104835 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java
@@ -48,8 +48,8 @@ public class ToCharLong extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     if (df == null) {
-      df = new DecimalFormat(params.get(1).asChars());
+      df = new DecimalFormat(params.getText(1));
     }
-    return new TextDatum(df.format(params.get(0).asInt8()));
+    return new TextDatum(df.format(params.getInt8(0)));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
index 2e20008..d410b9b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
@@ -65,12 +65,11 @@ public class ToHex extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) {
+    if (params.isBlankOrNull(0)) {
       return NullDatum.get();
     }
 
-    String ret = new String(Hex.encodeHex(datum.asByteArray()));
+    String ret = new String(Hex.encodeHex(params.getBytes(0)));
     return DatumFactory.createText(trimZero(ret));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Upper.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Upper.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Upper.java
index fc6ff3a..7525d95 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Upper.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Upper.java
@@ -50,9 +50,10 @@ public class Upper extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if (params.isBlankOrNull(0)) {
+      return NullDatum.get();
+    }
 
-    return DatumFactory.createText(datum.asChars().toUpperCase());
+    return DatumFactory.createText(params.getText(0).toUpperCase());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/window/FirstValue.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/FirstValue.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/FirstValue.java
index ba3b3b6..6a667ec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/FirstValue.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/FirstValue.java
@@ -38,10 +38,10 @@ public abstract class FirstValue extends WindowAggFunc<Datum> {
   @Override
   public void eval(FunctionContext ctx, Tuple params) {
     FirstValueContext firstValueCtx = (FirstValueContext)ctx;
-    if(firstValueCtx.isSet == false) {
+    if(!firstValueCtx.isSet) {
       firstValueCtx.isSet = true;
-      if (params.get(0).isNotNull()) {
-        firstValueCtx.first = params.get(0);
+      if (!params.isBlankOrNull(0)) {
+        firstValueCtx.first = params.asDatum(0);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java
index 5107297..eb7e8b7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java
@@ -45,20 +45,20 @@ public abstract class Lag extends WindowAggFunc<Datum> {
       if (params.size() == 1) {
         lagNum = 1;
       } else {
-        lagNum = params.get(1).asInt4();
+        lagNum = params.getInt4(1);
       }
       lagCtx.lagBuffer = new CircularFifoBuffer(lagNum+1);
     }
 
-    if (params.get(0).isNotNull()) {
-      lagCtx.lagBuffer.add(params.get(0));
+    if (!params.isBlankOrNull(0)) {
+      lagCtx.lagBuffer.add(params.asDatum(0));
     } else {
       lagCtx.lagBuffer.add(NullDatum.get());
     }
 
     if (lagCtx.defaultDatum == null) {
      if (params.size() == 3) {
-       lagCtx.defaultDatum = params.get(2);
+       lagCtx.defaultDatum = params.asDatum(2);
      } else {
        lagCtx.defaultDatum = NullDatum.get();
      }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java
index e469c83..9cb95f7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java
@@ -48,7 +48,7 @@ public final class Rank extends WindowAggFunc {
 
   public static boolean checkIfDistinctValue(RankContext context, Tuple params) {
     for (int i = 0; i < context.latest.length; i++) {
-      if (!context.latest[i].equalsTo(params.get(i)).isTrue()) {
+      if (!context.latest[i].equalsTo(params.asDatum(i)).isTrue()) {
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index 38aa928..c505b77 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -57,81 +57,83 @@ public abstract class RangePartitionAlgorithm {
    * It computes the value cardinality of a tuple range.
    *
    * @param dataType
-   * @param start
-   * @param end
+   * @param range
+   * @param i
    * @return
    */
-  public static BigInteger computeCardinality(DataType dataType, Datum start, Datum end,
+  public static BigInteger computeCardinality(DataType dataType, TupleRange range, int i,
                                               boolean inclusive, boolean isAscending) {
     BigInteger columnCard;
 
+    Tuple start = range.getStart();
+    Tuple end = range.getEnd();
     switch (dataType.getType()) {
       case BOOLEAN:
         columnCard = BigInteger.valueOf(2);
         break;
       case CHAR:
         if (isAscending) {
-          columnCard = BigInteger.valueOf((int)end.asChar() - (int)start.asChar());
+          columnCard = BigInteger.valueOf((int)end.getChar(i) - (int)start.getChar(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asChar() - end.asChar());
+          columnCard = BigInteger.valueOf(start.getChar(i) - end.getChar(i));
         }
         break;
       case BIT:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asByte() - start.asByte());
+          columnCard = BigInteger.valueOf(end.getByte(i) - start.getByte(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asByte() - end.asByte());
+          columnCard = BigInteger.valueOf(start.getByte(i) - end.getByte(i));
         }
         break;
       case INT2:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asInt2() - start.asInt2());
+          columnCard = BigInteger.valueOf(end.getInt2(i) - start.getInt2(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asInt2() - end.asInt2());
+          columnCard = BigInteger.valueOf(start.getInt2(i) - end.getInt2(i));
         }
         break;
       case INT4:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
+          columnCard = BigInteger.valueOf(end.getInt4(i) - start.getInt4(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
+          columnCard = BigInteger.valueOf(start.getInt4(i) - end.getInt4(i));
         }
         break;
     case INT8:
     case TIME:
     case TIMESTAMP:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asInt8() - start.asInt8());
+          columnCard = BigInteger.valueOf(end.getInt8(i) - start.getInt8(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asInt8() - end.asInt8());
+          columnCard = BigInteger.valueOf(start.getInt8(i) - end.getInt8(i));
         }
         break;
       case FLOAT4:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
+          columnCard = BigInteger.valueOf(end.getInt4(i) - start.getInt4(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
+          columnCard = BigInteger.valueOf(start.getInt4(i) - end.getInt4(i));
         }
         break;
       case FLOAT8:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asInt8() - start.asInt8());
+          columnCard = BigInteger.valueOf(end.getInt8(i) - start.getInt8(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asInt8() - end.asInt8());
+          columnCard = BigInteger.valueOf(start.getInt8(i) - end.getInt8(i));
         }
         break;
       case TEXT: {
-        boolean isPureAscii = StringUtils.isPureAscii(start.asChars()) && StringUtils.isPureAscii(end.asChars());
+        boolean isPureAscii = StringUtils.isPureAscii(start.getText(i)) && StringUtils.isPureAscii(end.getText(i));
 
         if (isPureAscii) {
           byte[] a;
           byte[] b;
           if (isAscending) {
-            a = start.asByteArray();
-            b = end.asByteArray();
+            a = start.getBytes(i);
+            b = end.getBytes(i);
           } else {
-            b = start.asByteArray();
-            a = end.asByteArray();
+            b = start.getBytes(i);
+            a = end.getBytes(i);
           }
 
           byte [][] padded = BytesUtils.padBytes(a, b);
@@ -148,11 +150,11 @@ public abstract class RangePartitionAlgorithm {
           char [] b;
 
           if (isAscending) {
-            a = start.asUnicodeChars();
-            b = end.asUnicodeChars();
+            a = start.getUnicodeChars(i);
+            b = end.getUnicodeChars(i);
           } else {
-            b = start.asUnicodeChars();
-            a = end.asUnicodeChars();
+            b = start.getUnicodeChars(i);
+            a = end.getUnicodeChars(i);
           }
 
           BigInteger startBI = UniformRangePartition.charsToBigInteger(a);
@@ -165,16 +167,16 @@ public abstract class RangePartitionAlgorithm {
       }
       case DATE:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
+          columnCard = BigInteger.valueOf(end.getInt4(i) - start.getInt4(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
+          columnCard = BigInteger.valueOf(start.getInt4(i) - end.getInt4(i));
         }
         break;
       case INET4:
         if (isAscending) {
-          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
+          columnCard = BigInteger.valueOf(end.getInt4(i) - start.getInt4(i));
         } else {
-          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
+          columnCard = BigInteger.valueOf(start.getInt4(i) - end.getInt4(i));
         }
         break;
       default:
@@ -189,13 +191,11 @@ public abstract class RangePartitionAlgorithm {
    * @return
    */
   public static BigInteger computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
-    Tuple start = range.getStart();
-    Tuple end = range.getEnd();
 
     BigInteger cardinality = BigInteger.ONE;
     BigInteger columnCard;
     for (int i = 0; i < sortSpecs.length; i++) {
-      columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive,
+      columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), range, i, inclusive,
           sortSpecs[i].isAscending());
 
       if (BigInteger.ZERO.compareTo(columnCard) < 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 551a9d0..7c26857 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -24,7 +24,6 @@ import com.sun.tools.javac.util.Convert;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.TextDatum;
@@ -70,21 +69,18 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     // filling pure ascii flags
     isPureAscii = new boolean[sortSpecs.length];
     for (int i = 0; i < sortSpecs.length; i++) {
-      Datum startValue = entireRange.getStart().get(i);
-      Datum endValue = entireRange.getEnd().get(i);
-      isPureAscii[i] = StringUtils.isPureAscii(startValue.asChars()) && StringUtils.isPureAscii(endValue.asChars());
-      beginNulls[i] = startValue.isNull();
-      endNulls[i] = endValue.isNull();
+      String startValue = entireRange.getStart().getText(i);
+      String endValue = entireRange.getEnd().getText(i);
+      isPureAscii[i] = StringUtils.isPureAscii(startValue) && StringUtils.isPureAscii(endValue);
+      beginNulls[i] = entireRange.getStart().isBlankOrNull(i);
+      endNulls[i] = entireRange.getEnd().isBlankOrNull(i);
     }
 
     colCards = new BigInteger[sortSpecs.length];
     normalize(sortSpecs, this.mergedRange);
 
     for (int i = 0; i < sortSpecs.length; i++) {
-      Datum startValue = entireRange.getStart().get(i);
-      Datum endValue = entireRange.getEnd().get(i);
-
-      colCards[i] =  computeCardinality(sortSpecs[i].getSortKey().getDataType(), startValue, endValue,
+      colCards[i] =  computeCardinality(sortSpecs[i].getSortKey().getDataType(), entireRange, i,
           inclusive, sortSpecs[i].isAscending());
     }
 
@@ -177,13 +173,13 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         if (isPureAscii[i]) {
           byte[] startBytes;
           byte[] endBytes;
-          if (range.getStart().isNull(i)) {
+          if (range.getStart().isBlankOrNull(i)) {
             startBytes = BigInteger.ZERO.toByteArray();
           } else {
             startBytes = range.getStart().getBytes(i);
           }
 
-          if (range.getEnd().isNull(i)) {
+          if (range.getEnd().isBlankOrNull(i)) {
             endBytes = BigInteger.ZERO.toByteArray();
           } else {
             endBytes = range.getEnd().getBytes(i);
@@ -196,13 +192,13 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         } else {
           char[] startChars;
           char[] endChars;
-          if (range.getStart().isNull(i)) {
+          if (range.getStart().isBlankOrNull(i)) {
             startChars = new char[] {0};
           } else {
             startChars = range.getStart().getUnicodeChars(i);
           }
 
-          if (range.getEnd().isNull(i)) {
+          if (range.getEnd().isBlankOrNull(i)) {
             endChars = new char[] {0};
           } else {
             endChars = range.getEnd().getUnicodeChars(i);
@@ -220,12 +216,12 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
   *  Check whether an overflow occurs or not.
    *
    * @param colId The column id to be checked
-   * @param last
+   * @param tuple
    * @param inc
    * @param sortSpecs
    * @return
    */
-  public boolean isOverflow(int colId, Datum last, BigInteger inc, SortSpec [] sortSpecs) {
+  public boolean isOverflow(int colId, Tuple tuple, int i, BigInteger inc, SortSpec [] sortSpecs) {
     Column column = sortSpecs[colId].getSortKey();
     BigDecimal incDecimal = new BigDecimal(inc);
     BigDecimal candidate;
@@ -234,74 +230,74 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     switch (column.getDataType().getType()) {
       case BIT: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal(last.asByte()));
-          return new BigDecimal(mergedRange.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(tuple.getByte(i)));
+          return new BigDecimal(mergedRange.getEnd().getByte(colId)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asByte()).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asByte())) < 0;
+          candidate = new BigDecimal(tuple.getByte(i)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().getByte(colId))) < 0;
         }
       }
       case CHAR: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal((int)last.asChar()));
-          return new BigDecimal((int) mergedRange.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal((int)tuple.getChar(i)));
+          return new BigDecimal((int) mergedRange.getEnd().getChar(colId)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal((int)last.asChar()).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal((int) mergedRange.getEnd().get(colId).asChar())) < 0;
+          candidate = new BigDecimal((int)tuple.getChar(i)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal((int) mergedRange.getEnd().getChar(colId))) < 0;
         }
       }
       case INT2: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal(last.asInt2()));
-          return new BigDecimal(mergedRange.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(tuple.getInt2(i)));
+          return new BigDecimal(mergedRange.getEnd().getInt2(colId)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asInt2()).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt2())) < 0;
+          candidate = new BigDecimal(tuple.getInt2(i)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().getInt2(colId))) < 0;
         }
       }
       case DATE:
       case INT4: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal(last.asInt4()));
-          return new BigDecimal(mergedRange.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(tuple.getInt4(i)));
+          return new BigDecimal(mergedRange.getEnd().getInt4(colId)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asInt4()).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt4())) < 0;
+          candidate = new BigDecimal(tuple.getInt4(i)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().getInt4(colId))) < 0;
         }
       }
       case TIME:
       case TIMESTAMP:
       case INT8: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal(last.asInt8()));
-          return new BigDecimal(mergedRange.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(tuple.getInt8(i)));
+          return new BigDecimal(mergedRange.getEnd().getInt8(colId)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asInt8()).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt8())) < 0;
+          candidate = new BigDecimal(tuple.getInt8(i)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().getInt8(colId))) < 0;
         }
       }
       case FLOAT4: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal(last.asFloat4()));
-          return new BigDecimal(mergedRange.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(tuple.getFloat4(i)));
+          return new BigDecimal(mergedRange.getEnd().getFloat4(colId)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asFloat4()).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asFloat4())) < 0;
+          candidate = new BigDecimal(tuple.getFloat4(i)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().getFloat4(colId))) < 0;
         }
       }
       case FLOAT8: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal(last.asFloat8()));
-          return new BigDecimal(mergedRange.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(tuple.getFloat8(i)));
+          return new BigDecimal(mergedRange.getEnd().getFloat8(colId)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asFloat8()).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asFloat8())) < 0;
+          candidate = new BigDecimal(tuple.getFloat8(i)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().getFloat8(colId))) < 0;
         }
 
       }
       case TEXT: {
         if (isPureAscii[colId]) {
-          byte[] lastBytes = last.asByteArray();
+          byte[] lastBytes = tuple.getBytes(i);
           byte[] endBytes = mergedRange.getEnd().getBytes(colId);
 
           Preconditions.checkState(lastBytes.length == endBytes.length);
@@ -314,7 +310,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             return candidate.compareTo(new BigDecimal(new BigInteger(endBytes))) < 0;
           }
         } else {
-          char[] lastChars = last.asUnicodeChars();
+          char[] lastChars = tuple.getUnicodeChars(i);
           char[] endChars = mergedRange.getEnd().getUnicodeChars(colId);
 
           Preconditions.checkState(lastChars.length == endChars.length);
@@ -335,45 +331,45 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         int candidateIntVal;
         byte[] candidateBytesVal = new byte[4];
         if (sortSpecs[colId].isAscending()) {
-          candidateIntVal = incDecimal.intValue() + last.asInt4();
-          if (candidateIntVal - incDecimal.intValue() != last.asInt4()) {
+          candidateIntVal = incDecimal.intValue() + tuple.getInt4(i);
+          if (candidateIntVal - incDecimal.intValue() != tuple.getInt4(i)) {
             return true;
           }
           Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
-          return Bytes.compareTo(mergedRange.getEnd().get(colId).asByteArray(), candidateBytesVal) < 0;
+          return Bytes.compareTo(mergedRange.getEnd().getBytes(colId), candidateBytesVal) < 0;
         } else {
-          candidateIntVal = last.asInt4() - incDecimal.intValue();
-          if (candidateIntVal + incDecimal.intValue() != last.asInt4()) {
+          candidateIntVal = tuple.getInt4(i) - incDecimal.intValue();
+          if (candidateIntVal + incDecimal.intValue() != tuple.getInt4(i)) {
             return true;
           }
           Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
-          return Bytes.compareTo(candidateBytesVal, mergedRange.getEnd().get(colId).asByteArray()) < 0;
+          return Bytes.compareTo(candidateBytesVal, mergedRange.getEnd().getBytes(colId)) < 0;
         }
       }
     }
     return overflow;
   }
 
-  public long incrementAndGetReminder(int colId, Datum last, long inc) {
+  private long incrementAndGetReminder(int colId, Tuple last, long inc) {
     Column column = sortSpecs[colId].getSortKey();
     long reminder = 0;
     switch (column.getDataType().getType()) {
       case BIT: {
-        long candidate = last.asByte() + inc;
-        byte end = mergedRange.getEnd().get(colId).asByte();
+        long candidate = last.getByte(colId) + inc;
+        byte end = mergedRange.getEnd().getByte(colId);
         reminder = candidate - end;
         break;
       }
       case CHAR: {
-        long candidate = last.asChar() + inc;
-        char end = mergedRange.getEnd().get(colId).asChar();
+        long candidate = last.getChar(colId) + inc;
+        char end = mergedRange.getEnd().getChar(colId);
         reminder = candidate - end;
         break;
       }
       case DATE:
       case INT4: {
-        int candidate = (int) (last.asInt4() + inc);
-        int end = mergedRange.getEnd().get(colId).asInt4();
+        int candidate = (int) (last.getInt4(colId) + inc);
+        int end = mergedRange.getEnd().getInt4(colId);
         reminder = candidate - end;
         break;
       }
@@ -381,26 +377,26 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
       case TIMESTAMP:
       case INT8:
       case INET4: {
-        long candidate = last.asInt8() + inc;
-        long end = mergedRange.getEnd().get(colId).asInt8();
+        long candidate = last.getInt8(colId) + inc;
+        long end = mergedRange.getEnd().getInt8(colId);
         reminder = candidate - end;
         break;
       }
       case FLOAT4: {
-        float candidate = last.asFloat4() + inc;
-        float end = mergedRange.getEnd().get(colId).asFloat4();
+        float candidate = last.getFloat4(colId) + inc;
+        float end = mergedRange.getEnd().getFloat4(colId);
         reminder = (long) (candidate - end);
         break;
       }
       case FLOAT8: {
-        double candidate = last.asFloat8() + inc;
-        double end = mergedRange.getEnd().get(colId).asFloat8();
+        double candidate = last.getFloat8(colId) + inc;
+        double end = mergedRange.getEnd().getFloat8(colId);
         reminder = (long) Math.ceil(candidate - end);
         break;
       }
       case TEXT: {
-        byte [] lastBytes = last.asByteArray();
-        byte [] endBytes = mergedRange.getEnd().get(colId).asByteArray();
+        byte [] lastBytes = last.getBytes(colId);
+        byte [] endBytes = mergedRange.getEnd().getBytes(colId);
 
         Preconditions.checkState(lastBytes.length == endBytes.length);
 
@@ -447,12 +443,12 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     int finalId = baseDigit;
     incs[finalId] = value;
     for (int i = finalId; i >= 0; i--) {
-      if (isOverflow(i, last.get(i), incs[i], sortSpecs)) {
+      if (isOverflow(i, last, i, incs[i], sortSpecs)) {
         if (i == 0) {
           throw new RangeOverflowException(mergedRange, last, incs[i].longValue(), sortSpecs[i].isAscending());
         }
         // increment some volume of the serialized one-dimension key space
-        long rem = incrementAndGetReminder(i, last.get(i), value.longValue());
+        long rem = incrementAndGetReminder(i, last, value.longValue());
         incs[i] = BigInteger.valueOf(rem);
         incs[i - 1] = incs[i-1].add(BigInteger.ONE);
         overflowFlag[i] = true;
@@ -470,107 +466,107 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
       }
     }
 
-    Tuple end = new VTuple(sortSpecs.length);
+    VTuple end = new VTuple(sortSpecs.length);
     Column column;
     for (int i = 0; i < last.size(); i++) {
       column = sortSpecs[i].getSortKey();
       switch (column.getDataType().getType()) {
         case CHAR:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createChar((char) (mergedRange.getStart().get(i).asChar() + incs[i].longValue())));
+            end.put(i, DatumFactory.createChar((char) (mergedRange.getStart().getChar(i) + incs[i].longValue())));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
+              end.put(i, DatumFactory.createChar((char) (last.getChar(i) + incs[i].longValue())));
             } else {
-              end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() - incs[i].longValue())));
+              end.put(i, DatumFactory.createChar((char) (last.getChar(i) - incs[i].longValue())));
             }
           }
           break;
         case BIT:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createBit(
-                (byte) (mergedRange.getStart().get(i).asByte() + incs[i].longValue())));
+                (byte) (mergedRange.getStart().getByte(i) + incs[i].longValue())));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
+              end.put(i, DatumFactory.createBit((byte) (last.getByte(i) + incs[i].longValue())));
             } else {
-              end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() - incs[i].longValue())));
+              end.put(i, DatumFactory.createBit((byte) (last.getByte(i) - incs[i].longValue())));
             }
           }
           break;
         case INT2:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt2(
-                (short) (mergedRange.getStart().get(i).asInt2() + incs[i].longValue())));
+                (short) (mergedRange.getStart().getInt2(i) + incs[i].longValue())));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
+              end.put(i, DatumFactory.createInt2((short) (last.getInt2(i) + incs[i].longValue())));
             } else {
-              end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() - incs[i].longValue())));
+              end.put(i, DatumFactory.createInt2((short) (last.getInt2(i) - incs[i].longValue())));
             }
           }
           break;
         case INT4:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt4(
-                (int) (mergedRange.getStart().get(i).asInt4() + incs[i].longValue())));
+                (int) (mergedRange.getStart().getInt4(i) + incs[i].longValue())));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+              end.put(i, DatumFactory.createInt4((int) (last.getInt4(i) + incs[i].longValue())));
             } else {
-              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() - incs[i].longValue())));
+              end.put(i, DatumFactory.createInt4((int) (last.getInt4(i) - incs[i].longValue())));
             }
           }
           break;
         case INT8:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt8(
-                mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
+                mergedRange.getStart().getInt8(i) + incs[i].longValue()));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
+              end.put(i, DatumFactory.createInt8(last.getInt8(i) + incs[i].longValue()));
             } else {
-              end.put(i, DatumFactory.createInt8(last.get(i).asInt8() - incs[i].longValue()));
+              end.put(i, DatumFactory.createInt8(last.getInt8(i) - incs[i].longValue()));
             }
           }
           break;
         case FLOAT4:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createFloat4(
-                mergedRange.getStart().get(i).asFloat4() + incs[i].longValue()));
+                mergedRange.getStart().getFloat4(i) + incs[i].longValue()));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
+              end.put(i, DatumFactory.createFloat4(last.getFloat4(i) + incs[i].longValue()));
             } else {
-              end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() - incs[i].longValue()));
+              end.put(i, DatumFactory.createFloat4(last.getFloat4(i) - incs[i].longValue()));
             }
           }
           break;
         case FLOAT8:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createFloat8(
-                mergedRange.getStart().get(i).asFloat8() + incs[i].longValue()));
+                mergedRange.getStart().getFloat8(i) + incs[i].longValue()));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
+              end.put(i, DatumFactory.createFloat8(last.getFloat8(i) + incs[i].longValue()));
             } else {
-              end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() - incs[i].longValue()));
+              end.put(i, DatumFactory.createFloat8(last.getFloat8(i) - incs[i].longValue()));
             }
           }
           break;
         case TEXT:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createText(((char) (mergedRange.getStart().get(i).asChars().charAt(0)
+            end.put(i, DatumFactory.createText(((char) (mergedRange.getStart().getText(i).charAt(0)
                 + incs[i].longValue())) + ""));
           } else {
             BigInteger lastBigInt;
-            if (last.isNull(i)) {
+            if (last.isBlankOrNull(i)) {
               lastBigInt = BigInteger.valueOf(0);
               end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
             } else {
 
               if (isPureAscii[i]) {
-                lastBigInt = new BigInteger(last.get(i).asByteArray());
+                lastBigInt = new BigInteger(last.getBytes(i));
                 if (sortSpecs[i].isAscending()) {
                   end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
                 } else {
@@ -633,52 +629,52 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
           break;
         case DATE:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createDate((int) (mergedRange.getStart().get(i).asInt4() + incs[i].longValue())));
+            end.put(i, DatumFactory.createDate((int) (mergedRange.getStart().getInt4(i) + incs[i].longValue())));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+              end.put(i, DatumFactory.createDate((int) (last.getInt4(i) + incs[i].longValue())));
             } else {
-              end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() - incs[i].longValue())));
+              end.put(i, DatumFactory.createDate((int) (last.getInt4(i) - incs[i].longValue())));
             }
           }
           break;
         case TIME:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createTime(mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
+            end.put(i, DatumFactory.createTime(mergedRange.getStart().getInt8(i) + incs[i].longValue()));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+              end.put(i, DatumFactory.createTime(last.getInt8(i) + incs[i].longValue()));
             } else {
-              end.put(i, DatumFactory.createTime(last.get(i).asInt8() - incs[i].longValue()));
+              end.put(i, DatumFactory.createTime(last.getInt8(i) - incs[i].longValue()));
             }
           }
           break;
         case TIMESTAMP:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(
-                mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
+                mergedRange.getStart().getInt8(i) + incs[i].longValue()));
           } else {
             if (sortSpecs[i].isAscending()) {
-              end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.get(i).asInt8() + incs[i].longValue()));
+              end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) + incs[i].longValue()));
             } else {
-              end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.get(i).asInt8() - incs[i].longValue()));
+              end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) - incs[i].longValue()));
             }
           }
           break;
         case INET4:
           byte[] ipBytes;
           if (overflowFlag[i]) {
-            ipBytes = mergedRange.getStart().get(i).asByteArray();
+            ipBytes = mergedRange.getStart().getBytes(i);
             assert ipBytes.length == 4;
             end.put(i, DatumFactory.createInet4(ipBytes));
           } else {
             if (sortSpecs[i].isAscending()) {
-              int lastVal = last.get(i).asInt4() + incs[i].intValue();
+              int lastVal = last.getInt4(i) + incs[i].intValue();
               ipBytes = new byte[4];
               Bytes.putInt(ipBytes, 0, lastVal);
               end.put(i, DatumFactory.createInet4(ipBytes));
             } else {
-              int lastVal = last.get(i).asInt4() - incs[i].intValue();
+              int lastVal = last.getInt4(i) - incs[i].intValue();
               ipBytes = new byte[4];
               Bytes.putInt(ipBytes, 0, lastVal);
               end.put(i, DatumFactory.createInet4(ipBytes));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 4612d45..bc6975a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -80,8 +80,7 @@ public class BSTIndexScanExec extends PhysicalExec {
   public Tuple next() throws IOException {
     if(initialize) {
       //TODO : more complicated condition
-      Tuple key = new VTuple(datum.length);
-      key.put(datum);
+      Tuple key = new VTuple(datum);
       long offset = reader.find(key);
       if (offset == -1) {
         reader.close();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
index 588f0fc..a018fe1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
@@ -122,7 +122,7 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
     while (!context.isStopped() && (tuple = rightChild.next()) != null) {
       Tuple keyTuple = new VTuple(joinKeyPairs.size());
       for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(rightKeyList[i]));
+        keyTuple.put(i, tuple.asDatum(rightKeyList[i]));
       }
 
       /*
@@ -154,7 +154,7 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
 
   protected Tuple toKey(final Tuple outerTuple) {
     for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+      keyTuple.put(i, outerTuple.asDatum(leftKeyList[i]));
     }
     return keyTuple;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
index 9940608..2d836f4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
@@ -121,7 +121,7 @@ public class ComparableVector {
     }
 
     protected final void set(int index, Tuple tuple, int field) {
-      if (tuple.isNull(field)) {
+      if (tuple.isBlankOrNull(field)) {
         nulls.set(index);
         return;
       }
@@ -196,7 +196,7 @@ public class ComparableVector {
     public void set(Tuple tuple) {
       for (int i = 0; i < keyTypes.length; i++) {
         final int field = keyIndex[i];
-        if (tuple.isNull(field)) {
+        if (tuple.isBlankOrNull(field)) {
           keys[i] = null;
           continue;
         }
@@ -216,7 +216,7 @@ public class ComparableVector {
           case TEXT:
           case CHAR:
           case BLOB: keys[i] = tuple.getBytes(field); break;
-          case DATUM: keys[i] = tuple.get(field); break;
+          case DATUM: keys[i] = tuple.asDatum(field); break;
           default:
             throw new IllegalArgumentException();
         }
@@ -252,7 +252,7 @@ public class ComparableVector {
       for (int i = 0; i < keys.length; i++) {
         final int field = keyIndex[i];
         final boolean n1 = keys[i] == null;
-        final boolean n2 = tuple.isNull(field);
+        final boolean n2 = tuple.isBlankOrNull(field);
         if (n1 && n2) {
           continue;
         }
@@ -275,7 +275,7 @@ public class ComparableVector {
           case TEXT:
           case CHAR:
           case BLOB: if (!Arrays.equals((byte[])keys[i], tuple.getBytes(field))) return false; continue;
-          case DATUM: if (!keys[i].equals(tuple.get(field))) return false; continue;
+          case DATUM: if (!keys[i].equals(tuple.asDatum(field))) return false; continue;
         }
       }
       return true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
index 94429a0..7784817 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
@@ -175,7 +175,7 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
     while(!context.isStopped() && (tuple = child.next()) != null) {
       Tuple groupingKey = new VTuple(groupingKeyIndexes.length);
       for (int i = 0; i < groupingKeyIndexes.length; i++) {
-        groupingKey.put(i, tuple.get(groupingKeyIndexes[i]));
+        groupingKey.put(i, tuple.asDatum(groupingKeyIndexes[i]));
       }
       for (int i = 0; i < distinctAggregators.length; i++) {
         distinctAggregators[i].compute(groupingKey, tuple);
@@ -360,7 +360,7 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
     public void compute(Tuple groupingKey, Tuple tuple) throws IOException {
       Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length);
       for (int i = 0; i < distinctKeyIndexes.length; i++) {
-        distinctKeyTuple.put(i, tuple.get(distinctKeyIndexes[i]));
+        distinctKeyTuple.put(i, tuple.asDatum(distinctKeyIndexes[i]));
       }
 
       Set<Tuple> distinctEntry = distinctAggrDatas.get(groupingKey);
@@ -415,7 +415,7 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
       }
       // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ...
       // If n'st == this.nodeSequence set with real data, otherwise set with NullDatum
-      Tuple tuple = new VTuple(resultTupleLength);
+      VTuple tuple = new VTuple(resultTupleLength);
       int tupleIndex = 0;
       tuple.put(tupleIndex++, nodeSequenceDatum);
 
@@ -423,7 +423,7 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
       Tuple groupingKeyTuple = currentGroupingTuples.getKey();
       int groupingKeyLength = groupingKeyTuple.size();
       for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) {
-        tuple.put(tupleIndex, groupingKeyTuple.get(i));
+        tuple.put(tupleIndex, groupingKeyTuple.asDatum(i));
       }
 
       // merge distinctKey
@@ -432,13 +432,13 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
           Tuple distinctKeyTuple = distinctKeyIterator.next();
           int distinctKeyLength = distinctKeyTuple.size();
           for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) {
-            tuple.put(tupleIndex, distinctKeyTuple.get(j));
+            tuple.put(tupleIndex, distinctKeyTuple.asDatum(j));
           }
         } else {
           Tuple dummyTuple = distinctAggregators[i].getDummyTuple();
           int dummyTupleSize = dummyTuple.size();
           for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) {
-            tuple.put(tupleIndex, dummyTuple.get(j));
+            tuple.put(tupleIndex, dummyTuple.asDatum(j));
           }
         }
       }
@@ -457,7 +457,7 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
         }
         int tupleSize = nonDistinctTuple.size();
         for (int j = 0; j < tupleSize; j++, tupleIndex++) {
-          tuple.put(tupleIndex, nonDistinctTuple.get(j));
+          tuple.put(tupleIndex, nonDistinctTuple.asDatum(j));
         }
       }
       return tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
index 0f25d6c..c8a6588 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
@@ -236,10 +236,10 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
               // set group key tuple
               // Because each hashAggregator has different number of tuples,
               // sometimes getting group key from each hashAggregator will be null value.
-              mergedTuple.put(mergeTupleIndex, distinctGroupingKey.get(mergeTupleIndex));
+              mergedTuple.put(mergeTupleIndex, distinctGroupingKey.asDatum(mergeTupleIndex));
             } else {
               if (tuples[i] != null) {
-                mergedTuple.put(mergeTupleIndex, tuples[i].get(j));
+                mergedTuple.put(mergeTupleIndex, tuples[i].asDatum(j));
               } else {
                 mergedTuple.put(mergeTupleIndex, NullDatum.get());
               }
@@ -388,12 +388,12 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     public void compute(Tuple tuple) throws IOException {
       Tuple outerKeyTuple = new VTuple(distinctGroupingKeyIds.length);
       for (int i = 0; i < distinctGroupingKeyIds.length; i++) {
-        outerKeyTuple.put(i, tuple.get(distinctGroupingKeyIds[i]));
+        outerKeyTuple.put(i, tuple.asDatum(distinctGroupingKeyIds[i]));
       }
 
       Tuple keyTuple = new VTuple(groupingKeyIds.length);
       for (int i = 0; i < groupingKeyIds.length; i++) {
-        keyTuple.put(i, tuple.get(groupingKeyIds[i]));
+        keyTuple.put(i, tuple.asDatum(groupingKeyIds[i]));
       }
 
       Map<Tuple, FunctionContext[]> distinctEntry = hashTable.get(outerKeyTuple);
@@ -428,7 +428,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
         Tuple groupbyKey = entry.getKey();
         int index = 0;
         for (; index < groupbyKey.size(); index++) {
-          tuple.put(index, groupbyKey.get(index));
+          tuple.put(index, groupbyKey.asDatum(index));
         }
 
         FunctionContext[] contexts = entry.getValue();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
index b394390..5a262a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
@@ -192,7 +192,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
         throw new IOException(e.getMessage(), e);
       }
 
-      int distinctSeq = tuple.get(0).asInt2();
+      int distinctSeq = tuple.getInt2(0);
       Tuple keyTuple = getKeyTuple(distinctSeq, tuple);
 
       if (prevKeyTuple == null) {
@@ -267,12 +267,12 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
     int[] columnIndexes = distinctKeyIndexes[distinctSeq];
 
     Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1);
-    keyTuple.put(0, tuple.get(0));
+    keyTuple.put(0, tuple.asDatum(0));
     for (int i = 0; i < numGroupingColumns; i++) {
-      keyTuple.put(i + 1, tuple.get(i + 1));
+      keyTuple.put(i + 1, tuple.asDatum(i + 1));
     }
     for (int i = 0; i < columnIndexes.length; i++) {
-      keyTuple.put(i + 1 + numGroupingColumns, tuple.get(columnIndexes[i]));
+      keyTuple.put(i + 1 + numGroupingColumns, tuple.asDatum(columnIndexes[i]));
     }
 
     return keyTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
index 9ff479b..c91dcca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
@@ -117,7 +117,7 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec {
       int tupleSize = currentTuples[i].size();
       for (int j = 0; j < tupleSize; j++) {
         if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
-          mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].get(j));
+          mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].asDatum(j));
         }
         mergeTupleIndex++;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
index 267bd90..5791230 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -156,7 +156,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
         }
 
         for (int i = 0; i < numGroupingColumns; i++) {
-          resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+          resultTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
         }
         for (DistinctFinalAggregator eachAggr: aggregators) {
           eachAggr.terminate(resultTuple);
@@ -171,7 +171,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
         throw new IOException(e.getMessage(), e);
       }
 
-      int distinctSeq = tuple.get(0).asInt2();
+      int distinctSeq = tuple.getInt2(0);
       Tuple keyTuple = getGroupingKeyTuple(tuple);
 
       // First tuple
@@ -186,7 +186,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
       if (!prevKeyTuple.equals(keyTuple)) {
         // new grouping key
         for (int i = 0; i < numGroupingColumns; i++) {
-          resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+          resultTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
         }
         for (DistinctFinalAggregator eachAggr: aggregators) {
           eachAggr.terminate(resultTuple);
@@ -219,7 +219,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
   private Tuple getGroupingKeyTuple(Tuple tuple) {
     Tuple keyTuple = new VTuple(numGroupingColumns);
     for (int i = 0; i < numGroupingColumns; i++) {
-      keyTuple.put(i, tuple.get(i + 1));
+      keyTuple.put(i, tuple.asDatum(i + 1));
     }
 
     return keyTuple;
@@ -272,7 +272,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
       }
 
       if (seq == 0 && nonDistinctAggr != null) {
-        if (!tuple.get(nonDistinctAggr.inTupleIndex).isNull()) {
+        if (!tuple.isBlankOrNull(nonDistinctAggr.inTupleIndex)) {
           nonDistinctAggr.merge(tuple);
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index 8ffd503..e6d1a96 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -52,7 +52,7 @@ public class HashAggregateExec extends AggregationExec {
       keyTuple = new VTuple(groupingKeyIds.length);
       // build one key tuple
       for(int i = 0; i < groupingKeyIds.length; i++) {
-        keyTuple.put(i, tuple.get(groupingKeyIds[i]));
+        keyTuple.put(i, tuple.asDatum(groupingKeyIds[i]));
       }
 
       FunctionContext [] contexts = hashTable.get(keyTuple);
@@ -98,7 +98,7 @@ public class HashAggregateExec extends AggregationExec {
 
       int tupleIdx = 0;
       for (; tupleIdx < groupingKeyNum; tupleIdx++) {
-        tuple.put(tupleIdx, keyTuple.get(tupleIdx));
+        tuple.put(tupleIdx, keyTuple.asDatum(tupleIdx));
       }
       for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) {
         tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index e94bc26..0a812ee 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.plan.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.Tuple;
@@ -72,11 +71,10 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
       sb.delete(0, sb.length());
       if (keyIds != null) {
         for(int i = 0; i < keyIds.length; i++) {
-          Datum datum = tuple.get(keyIds[i]);
           if(i > 0)
             sb.append("/");
           sb.append(keyNames[i]).append("=");
-          sb.append(StringUtils.escapePathName(datum.asChars()));
+          sb.append(StringUtils.escapePathName(tuple.getText(keyIds[i])));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index a4215fa..3065c15 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -23,7 +23,10 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
 
@@ -50,6 +53,7 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
         projector.eval(frameTuple, outTuple);
         return outTuple;
       }
+
       Tuple leftTuple = leftChild.next(); // it comes from a disk
       if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed.
         finished = leftTuple == null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 8613eac..27f683b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -25,7 +25,8 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
 
 public class HashLeftOuterJoinExec extends HashJoinExec {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
index 233d6ec..20a9128 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
@@ -41,7 +41,7 @@ public class HashPartitioner extends Partitioner {
 
     // build one key tuple
     for (int i = 0; i < partitionKeyIds.length; i++) {
-      keyTuple.put(i, tuple.get(partitionKeyIds[i]));
+      keyTuple.put(i, tuple.asDatum(partitionKeyIds[i]));
     }
     return (keyTuple.hashCode() & Integer.MAX_VALUE) % numPartitions;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
index a59f8d9..515a2bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
@@ -64,13 +64,13 @@ public class JoinTupleComparator implements Comparator<Tuple> {
       if (outerTuple == null) {
         outer = NullDatum.get();
       } else {
-        outer = outerTuple.get(outerSortKeyIds[i]);
+        outer = outerTuple.asDatum(outerSortKeyIds[i]);
       }
 
       if (innerTuple == null) {
         inner = NullDatum.get();
       } else {
-        inner = innerTuple.get(innerSortKeyIds[i]);
+        inner = innerTuple.asDatum(innerSortKeyIds[i]);
       }
 
       if (outer instanceof NullDatum || inner instanceof NullDatum) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 7bf9e66..92e625c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -90,20 +90,25 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
-    Tuple keyTuple;
+    VTuple keyTuple;
     Tuple prevKeyTuple = null;
     long offset;
 
 
-    while(!context.isStopped() && (tuple = child.next()) != null) {
-      offset = appender.getOffset();
-      appender.addTuple(tuple);
-      keyTuple = new VTuple(keySchema.size());
-      RowStoreUtil.project(tuple, keyTuple, indexKeys);
-      if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
-        indexWriter.write(keyTuple, offset);
-        prevKeyTuple = keyTuple;
+    try {
+      while(!context.isStopped() && (tuple = child.next()) != null) {
+        offset = appender.getOffset();
+        appender.addTuple(tuple);
+        keyTuple = new VTuple(keySchema.size());
+        RowStoreUtil.project(tuple, keyTuple, indexKeys);
+        if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
+          indexWriter.write(keyTuple, offset);
+          prevKeyTuple = keyTuple;
+        }
       }
+    } catch (RuntimeException e) {
+      e.printStackTrace();
+      throw e;
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
index 9831d83..2feecd1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
@@ -61,7 +60,7 @@ public class SortAggregateExec extends AggregationExec {
       // get a key tuple
       currentKey = new VTuple(groupingKeyIds.length);
       for(int i = 0; i < groupingKeyIds.length; i++) {
-        currentKey.put(i, tuple.get(groupingKeyIds[i]));
+        currentKey.put(i, tuple.asDatum(groupingKeyIds[i]));
       }
 
       /** Aggregation State */
@@ -72,7 +71,7 @@ public class SortAggregateExec extends AggregationExec {
 
             // Merge when aggregator doesn't receive NullDatum
             if (!(groupingKeyNum == 0 && aggFunctionsNum == tuple.size()
-                && tuple.get(i) == NullDatum.get())) {
+                && tuple.isBlankOrNull(i))) {
               aggFunctions[i].merge(contexts[i], tuple);
             }
           }
@@ -90,7 +89,7 @@ public class SortAggregateExec extends AggregationExec {
         int tupleIdx = 0;
 
         for(; tupleIdx < groupingKeyNum; tupleIdx++) {
-          outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+          outputTuple.put(tupleIdx, lastKey.asDatum(tupleIdx));
         }
         for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
           outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
@@ -114,7 +113,7 @@ public class SortAggregateExec extends AggregationExec {
       outputTuple = new VTuple(outSchema.size());
       int tupleIdx = 0;
       for(; tupleIdx < groupingKeyNum; tupleIdx++) {
-        outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+        outputTuple.put(tupleIdx, lastKey.asDatum(tupleIdx));
       }
       for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
         outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index ca90b0e..a40fc1d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -52,7 +52,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
 
   private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) {
     for (int i = 0; i < keyIds.length; i++) {
-      keyTuple.put(i, inTuple.get(keyIds[i]));
+      keyTuple.put(i, inTuple.asDatum(keyIds[i]));
     }
   }
 
@@ -60,12 +60,11 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
     StringBuilder sb = new StringBuilder();
 
     for(int i = 0; i < keyIds.length; i++) {
-      Datum datum = keyTuple.get(i);
       if(i > 0) {
         sb.append("/");
       }
       sb.append(keyNames[i]).append("=");      
-      sb.append(StringUtils.escapePathName(datum.asChars()));
+      sb.append(StringUtils.escapePathName(keyTuple.getText(i)));
     }
     return sb.toString();
   }