You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/09/27 09:16:06 UTC

git commit: TAJO-213: NULL characters in meta of csv table should be supported. (jinho)

Updated Branches:
  refs/heads/master 33f697683 -> 3d30e2532


TAJO-213: NULL characters in meta of csv table should be supported. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3d30e253
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3d30e253
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3d30e253

Branch: refs/heads/master
Commit: 3d30e25321d304e15c304e2822593aa3d285712d
Parents: 33f6976
Author: jinossy <ji...@gmail.com>
Authored: Fri Sep 27 16:15:25 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Sep 27 16:15:25 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../java/org/apache/tajo/datum/NullDatum.java   | 34 +++---------
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  6 ++-
 .../tajo/engine/parser/HiveConverter.java       |  4 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  | 23 +++++---
 .../apache/tajo/engine/query/ResultSetImpl.java |  4 +-
 .../master/querymaster/QueryInProgress.java     |  4 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  2 +-
 .../tajo/engine/query/TestNullValues.java       | 34 ++++++++++++
 .../java/org/apache/tajo/storage/CSVFile.java   | 56 +++++++++++++++-----
 .../java/org/apache/tajo/storage/LazyTuple.java | 47 ++++++++++++----
 .../org/apache/tajo/storage/TestLazyTuple.java  | 12 +++--
 12 files changed, 160 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 83efb23..b981279 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.2.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-213: NULL characters in meta of csv table should be supported.
+    (jinho)
+
     TAJO-185: Implement split_part function. (hyunsik)
 
     TAJO-193: Add string pattern matching operators. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
index b5d3ef0..10c35a9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
@@ -18,23 +18,19 @@
 
 package org.apache.tajo.datum;
 
-import org.apache.tajo.util.Bytes;
-
 import static org.apache.tajo.common.TajoDataTypes.Type;
 
 public class NullDatum extends Datum {
-  private static final NullDatum instance;
-  private static String NULL_STRING = "";
-  private static byte[] NULL_CHAR = NULL_STRING.getBytes();
-  
+  private static NullDatum instance;
+
   static {
     instance = new NullDatum();
   }
-  
+
   private NullDatum() {
     super(Type.NULL);
   }
-  
+
   public static NullDatum get() {
     return instance;
   }
@@ -66,7 +62,7 @@ public class NullDatum extends Datum {
 
   @Override
   public byte[] asByteArray() {
-    return NULL_CHAR.clone();
+    return new byte[0];
   }
 
   @Override
@@ -81,12 +77,12 @@ public class NullDatum extends Datum {
 
   @Override
   public String asChars() {
-    return NULL_STRING;
+    return "";
   }
 
   @Override
   public byte[] asTextBytes() {
-    return asByteArray();
+    return new byte[0];
   }
 
   @Override
@@ -108,20 +104,4 @@ public class NullDatum extends Datum {
   public int hashCode() {
     return 0; // one of the prime number
   }
-
-  public static boolean isNull(String val){
-    return val == null || val.length() == 0 || ((val.length() == NULL_CHAR.length) && NULL_STRING.equals(val));
-  }
-
-  public static boolean isNull(byte[] val){
-    return val == null || val.length == 0 || ((val.length == NULL_CHAR.length) && Bytes.equals(val, NULL_CHAR));
-  }
-
-  public static boolean isNotNull(String val){
-    return !isNull(val);
-  }
-
-  public static boolean isNotNull(byte[] val){
-    return !isNull(val);
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index e10667f..f6d5a6e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -382,7 +382,11 @@ public class TajoCli {
                 for (int i = 1; i <= numOfColumns; i++) {
                   if (i > 1) sout.print(",  ");
                   String columnValue = res.getObject(i).toString();
-                  sout.print(columnValue);
+                  if(res.wasNull()){
+                    sout.print("null");
+                  } else {
+                    sout.print(columnValue);
+                  }
                 }
                 sout.println();
                 sout.flush();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
index 2e16813..0442e77 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
@@ -1485,8 +1485,8 @@ public class HiveConverter extends HiveParserBaseVisitor<Expr>{
 
             if(ctx.tableRowFormat() != null) {
                 if(ctx.tableRowFormat().rowFormatDelimited() != null) {
-                    params.put("csvfile.delimiter"
-                            , ctx.tableRowFormat().rowFormatDelimited().tableRowFormatFieldIdentifier().getChild(3).getText().replaceAll("'", ""));
+                  String delimiter = ctx.tableRowFormat().rowFormatDelimited().tableRowFormatFieldIdentifier().getChild(3).getText().replaceAll("'", "");
+                    params.put("csvfile.delimiter", SQLAnalyzer.escapeDelimiter(delimiter));
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 5b430ef..ae6d38e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -1037,21 +1037,30 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     return params;
   }
 
-  private Map<String, String> escapeTableMeta(Map<String, String> map) {
+  public Map<String, String> escapeTableMeta(Map<String, String> map) {
     Map<String, String> params = new HashMap<String, String>();
     for (Map.Entry<String, String> entry : map.entrySet()) {
-      String value = StringEscapeUtils.unescapeJava(entry.getValue());
       if (entry.getKey().equals(CSVFile.DELIMITER)) {
-        try {
-          value = new String(new byte[]{Byte.valueOf(value).byteValue()});
-        } catch (NumberFormatException e) {
-        }
+        params.put(entry.getKey(), escapeDelimiter(entry.getValue()));
+      } else if (entry.getKey().equals(CSVFile.NULL)) {
+        params.put(entry.getKey(), StringEscapeUtils.unescapeJava(entry.getValue()));
+      } else {
+        params.put(entry.getKey(), entry.getValue());
       }
-      params.put(entry.getKey(), StringEscapeUtils.escapeJava(value));
     }
     return params;
   }
 
+  public static String escapeDelimiter(String value) {
+    try {
+      String delimiter = StringEscapeUtils.unescapeJava(value);
+      delimiter = new String(new byte[]{Byte.valueOf(delimiter).byteValue()});
+      return StringEscapeUtils.escapeJava(delimiter);
+    } catch (NumberFormatException e) {
+    }
+    return value;
+  }
+
   private static String stripQuote(String str) {
     return str.substring(1, str.length() - 1);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index 639b43f..601a112 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -747,7 +747,7 @@ public class ResultSetImpl implements ResultSet {
   @Override
   public Object getObject(int fieldId) throws SQLException {
     Datum d = cur.get(fieldId - 1);
-    wasNull = (d == null);
+    handleNull(d);
 
     // TODO - to be changed to return Object type
     return d;
@@ -761,7 +761,7 @@ public class ResultSetImpl implements ResultSet {
   @Override
   public Object getObject(String arg0) throws SQLException {
     Datum d = cur.get(findColumn(arg0));
-    wasNull = (d == null);
+    handleNull(d);
     return d;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 53dfb6a..15d5b9b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -118,17 +118,17 @@ public class QueryInProgress extends CompositeService {
       } catch (InterruptedException e) {
         break;
       }
-      if(System.currentTimeMillis() - startTime > 60 * 1000) {
+      if(System.currentTimeMillis() - startTime > 300 * 1000) {
         LOG.warn("Failed to stop QueryMaster:" + queryId);
         break;
       }
     }
 
+    super.stop();
     if(queryMasterRpc != null) {
       //TODO release to connection pool
       queryMasterRpc.close();
     }
-    super.stop();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 05b5416..8b115b1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -199,7 +199,7 @@ public class TajoWorker extends CompositeService {
       tajoMasterRpc.close();
     }
 
-    if(webServer != null) {
+    if(webServer != null && webServer.isAlive()) {
       try {
         webServer.stop();
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
index 3b46a9b..4c0eec8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
@@ -125,4 +125,38 @@ public class TestNullValues {
       res.close();
     }
   }
+
+  @Test
+  public final void testIsNotNull3() throws Exception {
+    String [] table = new String[] {"nulltable4"};
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT8);
+    schema.addColumn("col2", Type.INT8);
+    schema.addColumn("col3", Type.INT8);
+    schema.addColumn("col4", Type.INT8);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.INT8);
+    schema.addColumn("col8", Type.INT8);
+    schema.addColumn("col9", Type.INT8);
+    schema.addColumn("col10", Type.INT8);
+    Schema [] schemas = new Schema[] {schema};
+    String [] data = {
+        "\\N,,,,672287821,",
+        ",\\N,,43578"
+    };
+    Options opts = new Options();
+    opts.put(CSVFile.DELIMITER, ",");
+    opts.put(CSVFile.NULL, "\\N");
+    ResultSet res = TajoTestingCluster
+        .run(table, schemas, opts, new String[][]{data},
+            "select * from nulltable4 where col1 is null and col2 is null and col3 is null and col5 is null and col4 = 43578");
+    try {
+      assertTrue(res.next());
+      assertEquals(43578, res.getLong(4));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 2fbee1e..fb66198 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -47,6 +47,7 @@ import java.util.Arrays;
 
 public class CSVFile {
   public static final String DELIMITER = "csvfile.delimiter";
+  public static final String NULL = "csvfile.null";     //read only
   public static final String DELIMITER_DEFAULT = "|";
   public static final byte LF = '\n';
   public static int EOF = -1;
@@ -65,6 +66,7 @@ public class CSVFile {
     private CompressionCodecFactory codecFactory;
     private CompressionCodec codec;
     private Path compressedPath;
+    private byte[] nullChars;
 
     public CSVAppender(Configuration conf, final TableMeta meta,
                        final Path path) throws IOException {
@@ -73,6 +75,13 @@ public class CSVFile {
       this.meta = meta;
       this.schema = meta.getSchema();
       this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0);
+
+      String nullCharacters = this.meta.getOption(NULL);
+      if (StringUtils.isEmpty(nullCharacters)) {
+        nullChars = NullDatum.get().asTextBytes();
+      } else {
+        nullChars = nullCharacters.getBytes();
+      }
     }
 
     @Override
@@ -126,13 +135,23 @@ public class CSVFile {
           col = schema.getColumn(i);
           if (col.getDataType().getType().equals(TajoDataTypes.Type.NULL)) {
 
-          } else if (col.getDataType().getType().equals(TajoDataTypes.Type.CHAR)){
+          } else if (col.getDataType().getType().equals(TajoDataTypes.Type.CHAR)) {
             datum = tuple.get(i);
-            byte[] pad = new byte[col.getDataType().getLength()- datum.size()];
-            outputStream.write(lTuple.getTextBytes(i));
-            outputStream.write(pad);
-          }
-          else {
+            if (datum instanceof NullDatum) {
+              outputStream.write(nullChars);
+            } else {
+              byte[] pad = new byte[col.getDataType().getLength() - datum.size()];
+              outputStream.write(datum.asTextBytes());
+              outputStream.write(pad);
+            }
+          } else if (col.getDataType().getType().equals(TajoDataTypes.Type.TEXT)) {
+            datum = tuple.get(i);
+            if (datum instanceof NullDatum) {
+              outputStream.write(nullChars);
+            } else {
+              outputStream.write(datum.asTextBytes());
+            }
+          } else {
             outputStream.write(lTuple.getTextBytes(i));
           }
 
@@ -152,7 +171,7 @@ public class CSVFile {
             stats.analyzeField(i, datum);
           }
           if (datum instanceof NullDatum) {
-            outputStream.write(NullDatum.get().asTextBytes());
+            outputStream.write(nullChars);
           } else {
             col = schema.getColumn(i);
             switch (col.getDataType().getType()) {
@@ -167,7 +186,9 @@ public class CSVFile {
                 break;
               case CHAR:
                 CharDatum charDatum = tuple.getChar(i);
+                byte[] pad = new byte[col.getDataType().getLength() - datum.size()];
                 outputStream.write(charDatum.asTextBytes());
+                outputStream.write(pad);
                 break;
               case TEXT:
                 outputStream.write(tuple.getText(i).asTextBytes());
@@ -281,9 +302,21 @@ public class CSVFile {
       if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
           splittable = false;
       }
+
+      // Buffer size, Delimiter
+      this.bufSize = DEFAULT_BUFFER_SIZE;
+      String delim  = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT);
+      this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
+
+      String nullCharacters = fragment.getMeta().getOption(NULL);
+      if (StringUtils.isEmpty(nullCharacters)) {
+        nullChars = NullDatum.get().asTextBytes();
+      } else {
+        nullChars = nullCharacters.getBytes();
+      }
     }
 
-    private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
+    private final static int DEFAULT_BUFFER_SIZE = 128 * 1024;
     private int bufSize;
     private char delimiter;
     private FileSystem fs;
@@ -304,14 +337,11 @@ public class CSVFile {
     private long prevTailLen = -1;
     private int[] targetColumnIndexes;
     private boolean eof = false;
+    private final byte[] nullChars;
 
     @Override
     public void init() throws IOException {
 
-      // Buffer size, Delimiter
-      this.bufSize = DEFAULT_BUFFER_SIZE;
-      String delim  = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT);
-      this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
       // Fragment information
       fs = fragment.getPath().getFileSystem(conf);
       fis = fs.open(fragment.getPath());
@@ -481,7 +511,7 @@ public class CSVFile {
         }
 
         byte[][] cells = Bytes.splitPreserveAllTokens(tuples[currentIdx++], delimiter, targetColumnIndexes);
-        return new LazyTuple(schema, cells, offset);
+        return new LazyTuple(schema, cells, offset, nullChars);
       } catch (Throwable t) {
         LOG.error("Tuple list length: " + (tuples != null ? tuples.length : 0), t);
         LOG.error("Tuple list current index: " + currentIdx, t);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 0e50bed..e063eda 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -24,6 +24,7 @@ import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.datum.exception.InvalidCastException;
 import org.apache.tajo.storage.json.StorageGsonHelper;
+import org.apache.tajo.util.Bytes;
 
 import java.net.InetAddress;
 import java.util.Arrays;
@@ -33,12 +34,18 @@ public class LazyTuple implements Tuple {
   private Datum[] values;
   private byte[][] textBytes;
   private Schema schema;
+  private byte[] nullBytes;
 
   public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
+    this(schema, textBytes, offset, NullDatum.get().asTextBytes());
+  }
+
+  public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes) {
     this.schema = schema;
     this.textBytes = textBytes;
     this.values = new Datum[schema.getColumnNum()];
     this.offset = offset;
+    this.nullBytes = nullBytes;
   }
 
   public LazyTuple(LazyTuple tuple) {
@@ -47,6 +54,7 @@ public class LazyTuple implements Tuple {
     this.offset = tuple.offset;
     this.schema = tuple.schema;
     this.textBytes = tuple.textBytes.clone();
+    this.nullBytes = tuple.nullBytes;
   }
 
   @Override
@@ -276,32 +284,49 @@ public class LazyTuple implements Tuple {
     return false;
   }
 
-  private static Datum createByTextBytes(TajoDataTypes.Type type, byte[] val) {
+
+  public  boolean isNull(byte[] val){
+    return val == null || val.length == 0 || ((val.length == nullBytes.length) && Bytes.equals(val, nullBytes));
+  }
+
+  public  boolean isNullText(byte[] val){
+    return val == null || (val.length > 0 && val.length == nullBytes.length && Bytes.equals(val, nullBytes));
+  }
+
+  public boolean isNotNull(byte[] val){
+    return !isNull(val);
+  }
+
+  public boolean isNotNullText(byte[] val){
+    return !isNullText((val));
+  }
+
+  private  Datum createByTextBytes(TajoDataTypes.Type type, byte[] val) {
     switch (type) {
       case BOOLEAN:
-        return NullDatum.isNotNull(val) ? DatumFactory.createBool(new String(val)) : NullDatum.get();
+        return isNotNull(val) ? DatumFactory.createBool(new String(val)) : NullDatum.get();
       case INT2:
-        return NullDatum.isNotNull(val) ? DatumFactory.createInt2(new String(val)) : NullDatum.get();
+        return isNotNull(val) ? DatumFactory.createInt2(new String(val)) : NullDatum.get();
       case INT4:
-        return NullDatum.isNotNull(val) ? DatumFactory.createInt4(new String(val)) : NullDatum.get();
+        return isNotNull(val) ? DatumFactory.createInt4(new String(val)) : NullDatum.get();
       case INT8:
-        return NullDatum.isNotNull(val) ? DatumFactory.createInt8(new String(val)) : NullDatum.get();
+        return isNotNull(val) ? DatumFactory.createInt8(new String(val)) : NullDatum.get();
       case FLOAT4:
-        return NullDatum.isNotNull(val) ? DatumFactory.createFloat4(new String(val)) : NullDatum.get();
+        return isNotNull(val) ? DatumFactory.createFloat4(new String(val)) : NullDatum.get();
       case FLOAT8:
-        return NullDatum.isNotNull(val) ? DatumFactory.createFloat8(new String(val)) : NullDatum.get();
+        return isNotNull(val) ? DatumFactory.createFloat8(new String(val)) : NullDatum.get();
       case CHAR:
-        return DatumFactory.createChar(new String(val).trim());
+        return isNotNullText(val) ? DatumFactory.createChar(new String(val).trim()) : NullDatum.get();
       case TEXT:
-        return DatumFactory.createText(val);
+        return isNotNullText(val) ? DatumFactory.createText(val) : NullDatum.get();
       case BIT:
         return DatumFactory.createBit(Byte.parseByte(new String(val)));
       case BLOB:
         return DatumFactory.createBlob(Base64.decodeBase64(val));
       case INET4:
-        return NullDatum.isNotNull(val) ? DatumFactory.createInet4(new String(val)) : NullDatum.get();
+        return isNotNull(val) ? DatumFactory.createInet4(new String(val)) : NullDatum.get();
       case ARRAY:
-        return NullDatum.isNotNull(val) ? StorageGsonHelper.getInstance().fromJson(new String(val), Datum.class) : NullDatum.get();
+        return isNotNull(val) ? StorageGsonHelper.getInstance().fromJson(new String(val), Datum.class) : NullDatum.get();
       case NULL:
         return NullDatum.get();
       default:

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3d30e253/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
index 3ca0789..b67ea92 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -33,9 +33,12 @@ public class TestLazyTuple {
 
   Schema schema;
   byte[][] textRow;
+  byte[] nullbytes;
 
   @Before
   public void setUp() {
+    nullbytes = "\\N".getBytes();
+
     schema = new Schema();
     schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
     schema.addColumn("col2", TajoDataTypes.Type.BIT);
@@ -48,7 +51,8 @@ public class TestLazyTuple {
     schema.addColumn("col9", TajoDataTypes.Type.TEXT);
     schema.addColumn("col10", TajoDataTypes.Type.BLOB);
     schema.addColumn("col11", TajoDataTypes.Type.INET4);
-    schema.addColumn("col12", TajoDataTypes.Type.NULL);
+    schema.addColumn("col12", TajoDataTypes.Type.INT4);
+    schema.addColumn("col13", TajoDataTypes.Type.NULL);
 
     StringBuilder sb = new StringBuilder();
     sb.append(DatumFactory.createBool(true)).append('|');
@@ -62,15 +66,15 @@ public class TestLazyTuple {
     sb.append(DatumFactory.createText("str2")).append('|');
     sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
     sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
+    sb.append(new String(nullbytes)).append('|');
     sb.append(NullDatum.get());
-
     textRow = Bytes.splitPreserveAllTokens(sb.toString().getBytes(), '|');
   }
 
   @Test
   public void testGetDatum() {
 
-    LazyTuple t1 = new LazyTuple(schema, textRow, -1);
+    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes);
     assertEquals(DatumFactory.createBool(true), t1.get(0));
     assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
     assertEquals(DatumFactory.createChar("str"), t1.get(2));
@@ -83,6 +87,7 @@ public class TestLazyTuple {
     assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
     assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
     assertEquals(NullDatum.get(), t1.get(11));
+    assertEquals(NullDatum.get(), t1.get(12));
   }
 
   @Test
@@ -106,6 +111,7 @@ public class TestLazyTuple {
     assertFalse(t1.contains(9));
     assertFalse(t1.contains(10));
     assertFalse(t1.contains(11));
+    assertFalse(t1.contains(12));
   }
 
   @Test