You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:09 UTC
[05/29] tajo git commit: TAJO-1209: Pluggable line (de)serializer for
DelimitedTextFile.
TAJO-1209: Pluggable line (de)serializer for DelimitedTextFile.
Closes #1209
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72dd29c5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72dd29c5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72dd29c5
Branch: refs/heads/hbase_storage
Commit: 72dd29c520981a3ffaac2150ee7306ca41192893
Parents: 3ae44b1
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Nov 27 19:46:58 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Nov 27 19:46:58 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../apache/tajo/storage/StorageConstants.java | 3 +-
.../org/apache/tajo/util/ReflectionUtil.java | 4 +-
.../tajo/storage/text/CSVLineDeserializer.java | 96 +++++++++++
.../apache/tajo/storage/text/CSVLineSerDe.java | 45 +++++
.../tajo/storage/text/CSVLineSerializer.java | 68 ++++++++
.../tajo/storage/text/DelimitedTextFile.java | 163 +++++++------------
.../tajo/storage/text/TextLineDeserializer.java | 60 +++++++
.../apache/tajo/storage/text/TextLineSerDe.java | 65 ++++++++
.../tajo/storage/text/TextLineSerializer.java | 45 +++++
10 files changed, 449 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f2141a6..6f38f65 100644
--- a/CHANGES
+++ b/CHANGES
@@ -67,6 +67,9 @@ Release 0.9.1 - unreleased
TAJO-1188: Fix testcase testTimestampConstructor in TestTimestampDatum.
(DaeMyung Kang via hyunsik)
+ TAJO-1209: Pluggable line (de)serializer for DelimitedTextFile.
+ (hyunsik)
+
BUG FIXES
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index 11ac9b7..3065d31 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -32,7 +32,8 @@ public class StorageConstants {
public static final String TEXT_DELIMITER = "text.delimiter";
public static final String TEXT_NULL = "text.null";
- public static final String TEXTFILE_SERDE = "textfile.serde";
+ public static final String TEXT_SERDE_CLASS = "text.serde.class";
+ public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerde";
@Deprecated
public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter";
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
index 410815f..eccc61f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
@@ -32,8 +32,8 @@ public class ReflectionUtil {
private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
new ConcurrentHashMap<Class<?>, Constructor<?>>();
- public static Object newInstance(Class<?> clazz)
- throws InstantiationException, IllegalAccessException {
+ public static Object newInstance(Class<?> clazz)
+ throws InstantiationException, IllegalAccessException {
return clazz.newInstance();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
new file mode 100644
index 0000000..f580da1
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class CSVLineDeserializer extends TextLineDeserializer {
+ private FieldSplitProcessor processor;
+ private FieldSerializerDeserializer fieldSerDer;
+ private ByteBuf nullChars;
+
+ public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+ super(schema, meta, targetColumnIndexes);
+ }
+
+ @Override
+ public void init() {
+ this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
+
+ if (nullChars != null) {
+ nullChars.release();
+ }
+ nullChars = TextLineSerDe.getNullChars(meta);
+
+ fieldSerDer = new TextFieldSerializerDeserializer();
+ }
+
+ public void deserialize(final ByteBuf lineBuf, Tuple tuple) throws IOException {
+ int[] projection = targetColumnIndexes;
+ if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
+ return;
+ }
+
+ final int rowLength = lineBuf.readableBytes();
+ int start = 0, fieldLength = 0, end = 0;
+
+ //Projection
+ int currentTarget = 0;
+ int currentIndex = 0;
+
+ while (end != -1) {
+ end = lineBuf.forEachByte(start, rowLength - start, processor);
+
+ if (end < 0) {
+ fieldLength = rowLength - start;
+ } else {
+ fieldLength = end - start;
+ }
+
+ if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
+ lineBuf.setIndex(start, start + fieldLength);
+ Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
+ tuple.put(currentIndex, datum);
+ currentTarget++;
+ }
+
+ if (projection.length == currentTarget) {
+ break;
+ }
+
+ start = end + 1;
+ currentIndex++;
+ }
+ }
+
+ @Override
+ public void release() {
+ if (nullChars != null) {
+ nullChars.release();
+ nullChars = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
new file mode 100644
index 0000000..e2686a6
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.StorageConstants;
+
+public class CSVLineSerDe extends TextLineSerDe {
+
+ public CSVLineSerDe() {
+ }
+
+ @Override
+ public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+ return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
+ }
+
+ @Override
+ public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+ return new CSVLineSerializer(schema, meta);
+ }
+
+ public static char getFieldDelimiter(TableMeta meta) {
+ return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
+ StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
new file mode 100644
index 0000000..684519c
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class CSVLineSerializer extends TextLineSerializer {
+ private FieldSerializerDeserializer serde;
+
+ private byte [] nullChars;
+ private char delimiter;
+
+ public CSVLineSerializer(Schema schema, TableMeta meta) {
+ super(schema, meta);
+ }
+
+ @Override
+ public void init() {
+ nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
+ delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+
+ serde = new TextFieldSerializerDeserializer();
+ }
+
+ @Override
+ public int serialize(OutputStream out, Tuple input) throws IOException {
+ int rowBytes = 0;
+
+ for (int i = 0; i < schema.size(); i++) {
+ Datum datum = input.get(i);
+ rowBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
+
+ if (schema.size() - 1 > i) {
+ out.write((byte) delimiter);
+ rowBytes += 1;
+ }
+ }
+
+ return rowBytes;
+ }
+
+ @Override
+ public void release() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 68d89e7..d15f394 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -19,8 +19,6 @@
package org.apache.tajo.storage.text;
import io.netty.buffer.ByteBuf;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,19 +33,20 @@ import org.apache.hadoop.io.compress.Compressor;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.ReflectionUtil;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class DelimitedTextFile {
@@ -56,15 +55,48 @@ public class DelimitedTextFile {
private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
+ /** it caches line serde classes. */
+ private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache =
+ new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>();
+
+ /**
+ * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given,
+ * it will use the specified serder class.
+ *
+ * @return TextLineSerder
+ */
+ public static TextLineSerDe getLineSerde(TableMeta meta) {
+ TextLineSerDe lineSerder;
+
+ String serDeClassName;
+
+ // if there is no given serde class, it will use CSV line serder.
+ serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS);
+
+ try {
+ Class<? extends TextLineSerDe> serdeClass;
+
+ if (serdeClassCache.containsKey(serDeClassName)) {
+ serdeClass = serdeClassCache.get(serDeClassName);
+ } else {
+ serdeClass = (Class<? extends TextLineSerDe>) Class.forName(CSVLineSerDe.class.getName());
+ serdeClassCache.put(serDeClassName, serdeClass);
+ }
+ lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
+ } catch (Throwable e) {
+ throw new RuntimeException("TextLineSerde class cannot be initialized");
+ }
+
+ return lineSerder;
+ }
+
public static class DelimitedTextFileAppender extends FileAppender {
private final TableMeta meta;
private final Schema schema;
- private final int columnNum;
private final FileSystem fs;
private FSDataOutputStream fos;
private DataOutputStream outputStream;
private CompressionOutputStream deflateFilter;
- private char delimiter;
private TableStatistics stats = null;
private Compressor compressor;
private CompressionCodecFactory codecFactory;
@@ -76,7 +108,7 @@ public class DelimitedTextFile {
private long pos = 0;
private NonSyncByteArrayOutputStream os;
- private FieldSerializerDeserializer serde;
+ private TextLineSerializer serializer;
public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
throws IOException {
@@ -84,17 +116,10 @@ public class DelimitedTextFile {
this.fs = path.getFileSystem(conf);
this.meta = meta;
this.schema = schema;
- this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER,
- StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
- this.columnNum = schema.size();
-
- String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL,
- NullDatum.DEFAULT_TEXT));
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
+ }
+
+ public TextLineSerDe getLineSerde() {
+ return DelimitedTextFile.getLineSerde(meta);
}
@Override
@@ -133,7 +158,8 @@ public class DelimitedTextFile {
this.stats = new TableStatistics(this.schema);
}
- serde = new TextFieldSerializerDeserializer();
+ serializer = getLineSerde().createSerializer(schema, meta);
+ serializer.init();
if (os == null) {
os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
@@ -145,26 +171,20 @@ public class DelimitedTextFile {
super.init();
}
-
@Override
public void addTuple(Tuple tuple) throws IOException {
- Datum datum;
- int rowBytes = 0;
+ // write
+ int rowBytes = serializer.serialize(os, tuple);
- for (int i = 0; i < columnNum; i++) {
- datum = tuple.get(i);
- rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars);
-
- if (columnNum - 1 > i) {
- os.write((byte) delimiter);
- rowBytes += 1;
- }
- }
+ // new line
os.write(LF);
rowBytes += 1;
+ // update positions
pos += rowBytes;
bufferedBytes += rowBytes;
+
+ // refill buffer if necessary
if (bufferedBytes > BUFFER_SIZE) {
flushBuffer();
}
@@ -197,6 +217,8 @@ public class DelimitedTextFile {
public void close() throws IOException {
try {
+ serializer.release();
+
if(outputStream != null){
flush();
}
@@ -241,18 +263,15 @@ public class DelimitedTextFile {
}
public static class DelimitedTextFileScanner extends FileScanner {
-
private boolean splittable = false;
private final long startOffset;
- private final long endOffset;
+ private final long endOffset;
private int recordCount = 0;
private int[] targetColumnIndexes;
- private ByteBuf nullChars;
- private FieldSerializerDeserializer serde;
private DelimitedLineReader reader;
- private FieldSplitProcessor processor;
+ private TextLineDeserializer deserializer;
public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
final FileFragment fragment)
@@ -265,30 +284,14 @@ public class DelimitedTextFile {
startOffset = fragment.getStartKey();
endOffset = startOffset + fragment.getEndKey();
+ }
- //Delimiter
- String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
- this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0));
+ public TextLineSerDe getLineSerde() {
+ return DelimitedTextFile.getLineSerde(meta);
}
@Override
public void init() throws IOException {
- if (nullChars != null) {
- nullChars.release();
- }
-
- String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
- NullDatum.DEFAULT_TEXT));
- byte[] bytes;
- if (StringUtils.isEmpty(nullCharacters)) {
- bytes = NullDatum.get().asTextBytes();
- } else {
- bytes = nullCharacters.getBytes();
- }
-
- nullChars = BufferPool.directBuffer(bytes.length, bytes.length);
- nullChars.writeBytes(bytes);
-
if (reader != null) {
reader.close();
}
@@ -305,8 +308,6 @@ public class DelimitedTextFile {
targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
}
- serde = new TextFieldSerializerDeserializer();
-
super.init();
Arrays.sort(targetColumnIndexes);
if (LOG.isDebugEnabled()) {
@@ -316,6 +317,9 @@ public class DelimitedTextFile {
if (startOffset > 0) {
reader.readLine(); // skip first line;
}
+
+ deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes);
+ deserializer.init();
}
public ByteBuf readLine() throws IOException {
@@ -362,7 +366,7 @@ public class DelimitedTextFile {
}
VTuple tuple = new VTuple(schema.size());
- fillTuple(schema, tuple, buf, targetColumnIndexes);
+ deserializer.deserialize(buf, tuple);
return tuple;
} catch (Throwable t) {
LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t);
@@ -370,44 +374,6 @@ public class DelimitedTextFile {
}
}
- private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException {
- int[] projection = target;
- if (lineBuf == null || target == null || target.length == 0) {
- return;
- }
-
- final int rowLength = lineBuf.readableBytes();
- int start = 0, fieldLength = 0, end = 0;
-
- //Projection
- int currentTarget = 0;
- int currentIndex = 0;
-
- while (end != -1) {
- end = lineBuf.forEachByte(start, rowLength - start, processor);
-
- if (end < 0) {
- fieldLength = rowLength - start;
- } else {
- fieldLength = end - start;
- }
-
- if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
- lineBuf.setIndex(start, start + fieldLength);
- Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
- dst.put(currentIndex, datum);
- currentTarget++;
- }
-
- if (projection.length == currentTarget) {
- break;
- }
-
- start = end + 1;
- currentIndex++;
- }
- }
-
@Override
public void reset() throws IOException {
init();
@@ -416,10 +382,7 @@ public class DelimitedTextFile {
@Override
public void close() throws IOException {
try {
- if (nullChars != null) {
- nullChars.release();
- nullChars = null;
- }
+ deserializer.release();
if (tableStats != null && reader != null) {
tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead)
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
new file mode 100644
index 0000000..645d118
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * Reads a text line and fills a Tuple with values
+ */
+public abstract class TextLineDeserializer {
+ protected Schema schema;
+ protected TableMeta meta;
+ protected int [] targetColumnIndexes;
+
+ public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
+ this.schema = schema;
+ this.meta = meta;
+ this.targetColumnIndexes = targetColumnIndexes;
+ }
+
+ /**
+ * Initialize SerDe
+ */
+ public abstract void init();
+
+ /**
+ * It fills a tuple with a read fields in a given line.
+ *
+ * @param buf Read line
+ * @param tuple Tuple to be filled with read fields
+ * @throws java.io.IOException
+ */
+ public abstract void deserialize(final ByteBuf buf, Tuple tuple) throws IOException;
+
+ /**
+ * Release external resources
+ */
+ public abstract void release();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
new file mode 100644
index 0000000..e81e289
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.StorageConstants;
+
+/**
+ * Pluggable Text Line SerDe class
+ */
+public abstract class TextLineSerDe {
+
+ public TextLineSerDe() {
+ }
+
+ public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes);
+
+ public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
+
+ public static ByteBuf getNullChars(TableMeta meta) {
+ byte[] nullCharByteArray = getNullCharsAsBytes(meta);
+
+ ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length);
+ nullChars.writeBytes(nullCharByteArray);
+
+ return nullChars;
+ }
+
+ public static byte [] getNullCharsAsBytes(TableMeta meta) {
+ byte [] nullChars;
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+ NullDatum.DEFAULT_TEXT));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+
+ return nullChars;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
new file mode 100644
index 0000000..0c2761f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Write a Tuple into single text formatted line
+ */
+public abstract class TextLineSerializer {
+ protected Schema schema;
+ protected TableMeta meta;
+
+ public TextLineSerializer(Schema schema, TableMeta meta) {
+ this.schema = schema;
+ this.meta = meta;
+ }
+
+ public abstract void init();
+
+ public abstract int serialize(OutputStream out, Tuple input) throws IOException;
+
+ public abstract void release();
+}