You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:09 UTC

[05/29] tajo git commit: TAJO-1209: Pluggable line (de)serializer for DelimitedTextFile.

TAJO-1209: Pluggable line (de)serializer for DelimitedTextFile.

Closes #1209


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72dd29c5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72dd29c5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72dd29c5

Branch: refs/heads/hbase_storage
Commit: 72dd29c520981a3ffaac2150ee7306ca41192893
Parents: 3ae44b1
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Nov 27 19:46:58 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Nov 27 19:46:58 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/storage/StorageConstants.java   |   3 +-
 .../org/apache/tajo/util/ReflectionUtil.java    |   4 +-
 .../tajo/storage/text/CSVLineDeserializer.java  |  96 +++++++++++
 .../apache/tajo/storage/text/CSVLineSerDe.java  |  45 +++++
 .../tajo/storage/text/CSVLineSerializer.java    |  68 ++++++++
 .../tajo/storage/text/DelimitedTextFile.java    | 163 +++++++------------
 .../tajo/storage/text/TextLineDeserializer.java |  60 +++++++
 .../apache/tajo/storage/text/TextLineSerDe.java |  65 ++++++++
 .../tajo/storage/text/TextLineSerializer.java   |  45 +++++
 10 files changed, 449 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f2141a6..6f38f65 100644
--- a/CHANGES
+++ b/CHANGES
@@ -67,6 +67,9 @@ Release 0.9.1 - unreleased
     TAJO-1188: Fix testcase testTimestampConstructor in TestTimestampDatum.
     (DaeMyung Kang via hyunsik)
 
+    TAJO-1209: Pluggable line (de)serializer for DelimitedTextFile.
+    (hyunsik)
+
 
   BUG FIXES
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index 11ac9b7..3065d31 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -32,7 +32,8 @@ public class StorageConstants {
 
   public static final String TEXT_DELIMITER = "text.delimiter";
   public static final String TEXT_NULL = "text.null";
-  public static final String TEXTFILE_SERDE = "textfile.serde";
+  public static final String TEXT_SERDE_CLASS = "text.serde.class";
+  public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerde";
 
   @Deprecated
   public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter";

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
index 410815f..eccc61f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
@@ -32,8 +32,8 @@ public class ReflectionUtil {
   private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
       new ConcurrentHashMap<Class<?>, Constructor<?>>();
 
-	public static Object newInstance(Class<?> clazz) 
-			throws InstantiationException, IllegalAccessException {         
+	public static Object newInstance(Class<?> clazz)
+			throws InstantiationException, IllegalAccessException {
 		return clazz.newInstance();
 	}
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
new file mode 100644
index 0000000..f580da1
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class CSVLineDeserializer extends TextLineDeserializer {
+  private FieldSplitProcessor processor;
+  private FieldSerializerDeserializer fieldSerDer;
+  private ByteBuf nullChars;
+
+  public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+    super(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public void init() {
+    this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
+
+    if (nullChars != null) {
+      nullChars.release();
+    }
+    nullChars = TextLineSerDe.getNullChars(meta);
+
+    fieldSerDer = new TextFieldSerializerDeserializer();
+  }
+
+  public void deserialize(final ByteBuf lineBuf, Tuple tuple) throws IOException {
+    int[] projection = targetColumnIndexes;
+    if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
+      return;
+    }
+
+    final int rowLength = lineBuf.readableBytes();
+    int start = 0, fieldLength = 0, end = 0;
+
+    //Projection
+    int currentTarget = 0;
+    int currentIndex = 0;
+
+    while (end != -1) {
+      end = lineBuf.forEachByte(start, rowLength - start, processor);
+
+      if (end < 0) {
+        fieldLength = rowLength - start;
+      } else {
+        fieldLength = end - start;
+      }
+
+      if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
+        lineBuf.setIndex(start, start + fieldLength);
+        Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
+        tuple.put(currentIndex, datum);
+        currentTarget++;
+      }
+
+      if (projection.length == currentTarget) {
+        break;
+      }
+
+      start = end + 1;
+      currentIndex++;
+    }
+  }
+
+  @Override
+  public void release() {
+    if (nullChars != null) {
+      nullChars.release();
+      nullChars = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
new file mode 100644
index 0000000..e2686a6
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.StorageConstants;
+
+public class CSVLineSerDe extends TextLineSerDe {
+
+  public CSVLineSerDe() {
+  }
+
+  @Override
+  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+    return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+    return new CSVLineSerializer(schema, meta);
+  }
+
+  public static char getFieldDelimiter(TableMeta meta) {
+    return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
+        StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
new file mode 100644
index 0000000..684519c
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class CSVLineSerializer extends TextLineSerializer {
+  private FieldSerializerDeserializer serde;
+
+  private byte [] nullChars;
+  private char delimiter;
+
+  public CSVLineSerializer(Schema schema, TableMeta meta) {
+    super(schema, meta);
+  }
+
+  @Override
+  public void init() {
+    nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
+    delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+
+    serde = new TextFieldSerializerDeserializer();
+  }
+
+  @Override
+  public int serialize(OutputStream out, Tuple input) throws IOException {
+    int rowBytes = 0;
+
+    for (int i = 0; i < schema.size(); i++) {
+      Datum datum = input.get(i);
+      rowBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
+
+      if (schema.size() - 1 > i) {
+        out.write((byte) delimiter);
+        rowBytes += 1;
+      }
+    }
+
+    return rowBytes;
+  }
+
+  @Override
+  public void release() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 68d89e7..d15f394 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -19,8 +19,6 @@
 package org.apache.tajo.storage.text;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,19 +33,20 @@ import org.apache.hadoop.io.compress.Compressor;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.ReflectionUtil;
 
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class DelimitedTextFile {
 
@@ -56,15 +55,48 @@ public class DelimitedTextFile {
 
   private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
 
+  /** it caches line serde classes. */
+  private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache =
+      new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>();
+
+  /**
+   * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given,
+   * it will use the specified serder class.
+   *
+   * @return TextLineSerder
+   */
+  public static TextLineSerDe getLineSerde(TableMeta meta) {
+    TextLineSerDe lineSerder;
+
+    String serDeClassName;
+
+    // if there is no given serde class, it will use CSV line serder.
+    serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS);
+
+    try {
+      Class<? extends TextLineSerDe> serdeClass;
+
+      if (serdeClassCache.containsKey(serDeClassName)) {
+        serdeClass = serdeClassCache.get(serDeClassName);
+      } else {
+        serdeClass = (Class<? extends TextLineSerDe>) Class.forName(CSVLineSerDe.class.getName());
+        serdeClassCache.put(serDeClassName, serdeClass);
+      }
+      lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
+    } catch (Throwable e) {
+      throw new RuntimeException("TextLineSerde class cannot be initialized");
+    }
+
+    return lineSerder;
+  }
+
   public static class DelimitedTextFileAppender extends FileAppender {
     private final TableMeta meta;
     private final Schema schema;
-    private final int columnNum;
     private final FileSystem fs;
     private FSDataOutputStream fos;
     private DataOutputStream outputStream;
     private CompressionOutputStream deflateFilter;
-    private char delimiter;
     private TableStatistics stats = null;
     private Compressor compressor;
     private CompressionCodecFactory codecFactory;
@@ -76,7 +108,7 @@ public class DelimitedTextFile {
     private long pos = 0;
 
     private NonSyncByteArrayOutputStream os;
-    private FieldSerializerDeserializer serde;
+    private TextLineSerializer serializer;
 
     public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
         throws IOException {
@@ -84,17 +116,10 @@ public class DelimitedTextFile {
       this.fs = path.getFileSystem(conf);
       this.meta = meta;
       this.schema = schema;
-      this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER,
-          StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
-      this.columnNum = schema.size();
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL,
-          NullDatum.DEFAULT_TEXT));
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes();
-      }
+    }
+
+    public TextLineSerDe getLineSerde() {
+      return DelimitedTextFile.getLineSerde(meta);
     }
 
     @Override
@@ -133,7 +158,8 @@ public class DelimitedTextFile {
         this.stats = new TableStatistics(this.schema);
       }
 
-      serde = new TextFieldSerializerDeserializer();
+      serializer = getLineSerde().createSerializer(schema, meta);
+      serializer.init();
 
       if (os == null) {
         os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
@@ -145,26 +171,20 @@ public class DelimitedTextFile {
       super.init();
     }
 
-
     @Override
     public void addTuple(Tuple tuple) throws IOException {
-      Datum datum;
-      int rowBytes = 0;
+      // write
+      int rowBytes = serializer.serialize(os, tuple);
 
-      for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-        rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars);
-
-        if (columnNum - 1 > i) {
-          os.write((byte) delimiter);
-          rowBytes += 1;
-        }
-      }
+      // new line
       os.write(LF);
       rowBytes += 1;
 
+      // update positions
       pos += rowBytes;
       bufferedBytes += rowBytes;
+
+      // refill buffer if necessary
       if (bufferedBytes > BUFFER_SIZE) {
         flushBuffer();
       }
@@ -197,6 +217,8 @@ public class DelimitedTextFile {
     public void close() throws IOException {
 
       try {
+        serializer.release();
+
         if(outputStream != null){
           flush();
         }
@@ -241,18 +263,15 @@ public class DelimitedTextFile {
   }
 
   public static class DelimitedTextFileScanner extends FileScanner {
-
     private boolean splittable = false;
     private final long startOffset;
-    private final long endOffset;
 
+    private final long endOffset;
     private int recordCount = 0;
     private int[] targetColumnIndexes;
 
-    private ByteBuf nullChars;
-    private FieldSerializerDeserializer serde;
     private DelimitedLineReader reader;
-    private FieldSplitProcessor processor;
+    private TextLineDeserializer deserializer;
 
     public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
                                     final FileFragment fragment)
@@ -265,30 +284,14 @@ public class DelimitedTextFile {
 
       startOffset = fragment.getStartKey();
       endOffset = startOffset + fragment.getEndKey();
+    }
 
-      //Delimiter
-      String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
-      this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0));
+    public TextLineSerDe getLineSerde() {
+      return DelimitedTextFile.getLineSerde(meta);
     }
 
     @Override
     public void init() throws IOException {
-      if (nullChars != null) {
-        nullChars.release();
-      }
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
-          NullDatum.DEFAULT_TEXT));
-      byte[] bytes;
-      if (StringUtils.isEmpty(nullCharacters)) {
-        bytes = NullDatum.get().asTextBytes();
-      } else {
-        bytes = nullCharacters.getBytes();
-      }
-
-      nullChars = BufferPool.directBuffer(bytes.length, bytes.length);
-      nullChars.writeBytes(bytes);
-
       if (reader != null) {
         reader.close();
       }
@@ -305,8 +308,6 @@ public class DelimitedTextFile {
         targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
       }
 
-      serde = new TextFieldSerializerDeserializer();
-
       super.init();
       Arrays.sort(targetColumnIndexes);
       if (LOG.isDebugEnabled()) {
@@ -316,6 +317,9 @@ public class DelimitedTextFile {
       if (startOffset > 0) {
         reader.readLine();  // skip first line;
       }
+
+      deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes);
+      deserializer.init();
     }
 
     public ByteBuf readLine() throws IOException {
@@ -362,7 +366,7 @@ public class DelimitedTextFile {
         }
 
         VTuple tuple = new VTuple(schema.size());
-        fillTuple(schema, tuple, buf, targetColumnIndexes);
+        deserializer.deserialize(buf, tuple);
         return tuple;
       } catch (Throwable t) {
         LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t);
@@ -370,44 +374,6 @@ public class DelimitedTextFile {
       }
     }
 
-    private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException {
-      int[] projection = target;
-      if (lineBuf == null || target == null || target.length == 0) {
-        return;
-      }
-
-      final int rowLength = lineBuf.readableBytes();
-      int start = 0, fieldLength = 0, end = 0;
-
-      //Projection
-      int currentTarget = 0;
-      int currentIndex = 0;
-
-      while (end != -1) {
-        end = lineBuf.forEachByte(start, rowLength - start, processor);
-
-        if (end < 0) {
-          fieldLength = rowLength - start;
-        } else {
-          fieldLength = end - start;
-        }
-
-        if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
-          lineBuf.setIndex(start, start + fieldLength);
-          Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
-          dst.put(currentIndex, datum);
-          currentTarget++;
-        }
-
-        if (projection.length == currentTarget) {
-          break;
-        }
-
-        start = end + 1;
-        currentIndex++;
-      }
-    }
-
     @Override
     public void reset() throws IOException {
       init();
@@ -416,10 +382,7 @@ public class DelimitedTextFile {
     @Override
     public void close() throws IOException {
       try {
-        if (nullChars != null) {
-          nullChars.release();
-          nullChars = null;
-        }
+        deserializer.release();
 
         if (tableStats != null && reader != null) {
           tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed Bytes. (decompressed bytes + overhead)

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
new file mode 100644
index 0000000..645d118
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * Reads a text line and fills a Tuple with values
+ */
+public abstract class TextLineDeserializer {
+  protected Schema schema;
+  protected TableMeta meta;
+  protected int [] targetColumnIndexes;
+
+  public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
+    this.schema = schema;
+    this.meta = meta;
+    this.targetColumnIndexes = targetColumnIndexes;
+  }
+
+  /**
+   * Initialize SerDe
+   */
+  public abstract void init();
+
+  /**
+   * It fills a tuple with a read fields in a given line.
+   *
+   * @param buf Read line
+   * @param tuple Tuple to be filled with read fields
+   * @throws java.io.IOException
+   */
+  public abstract void deserialize(final ByteBuf buf, Tuple tuple) throws IOException;
+
+  /**
+   * Release external resources
+   */
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
new file mode 100644
index 0000000..e81e289
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.StorageConstants;
+
+/**
+ * Pluggable Text Line SerDe class
+ */
+public abstract class TextLineSerDe {
+
+  public TextLineSerDe() {
+  }
+
+  public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes);
+
+  public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
+
+  public static ByteBuf getNullChars(TableMeta meta) {
+    byte[] nullCharByteArray = getNullCharsAsBytes(meta);
+
+    ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length);
+    nullChars.writeBytes(nullCharByteArray);
+
+    return nullChars;
+  }
+
+  public static byte [] getNullCharsAsBytes(TableMeta meta) {
+    byte [] nullChars;
+
+    String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+        NullDatum.DEFAULT_TEXT));
+    if (StringUtils.isEmpty(nullCharacters)) {
+      nullChars = NullDatum.get().asTextBytes();
+    } else {
+      nullChars = nullCharacters.getBytes();
+    }
+
+    return nullChars;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
new file mode 100644
index 0000000..0c2761f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Write a Tuple into single text formatted line
+ */
+public abstract class TextLineSerializer {
+  protected Schema schema;
+  protected TableMeta meta;
+
+  public TextLineSerializer(Schema schema, TableMeta meta) {
+    this.schema = schema;
+    this.meta = meta;
+  }
+
+  public abstract void init();
+
+  public abstract int serialize(OutputStream out, Tuple input) throws IOException;
+
+  public abstract void release();
+}