You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/03/18 04:25:21 UTC
tajo git commit: TAJO-1381: Support multi-bytes delimiter for Text
file
Repository: tajo
Updated Branches:
refs/heads/master 286b95679 -> 82d44af32
TAJO-1381: Support multi-bytes delimiter for Text file
Closes #410
Signed-off-by: Jinho Kim <jh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/82d44af3
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/82d44af3
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/82d44af3
Branch: refs/heads/master
Commit: 82d44af32246c63a32c049292f0a229f16e85768
Parents: 286b956
Author: navis.ryu <na...@apache.org>
Authored: Wed Mar 11 08:49:31 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed Mar 18 11:47:57 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../tajo/engine/query/TestSelectQuery.java | 24 +++++++++++
.../multibytes_delimiter_table3_ddl.sql | 3 ++
.../multibytes_delimiter_table4_ddl.sql | 3 ++
.../testMultiBytesDelimiter3.sql | 1 +
.../testMultiBytesDelimiter4.sql | 1 +
.../testMultiBytesDelimiter3.result | 7 +++
.../testMultiBytesDelimiter4.result | 7 +++
.../java/org/apache/tajo/storage/CSVFile.java | 11 +++--
.../tajo/storage/text/CSVLineDeserializer.java | 14 ++++--
.../apache/tajo/storage/text/CSVLineSerDe.java | 5 ++-
.../tajo/storage/text/CSVLineSerializer.java | 8 ++--
.../tajo/storage/text/DelimitedTextFile.java | 2 +-
.../tajo/storage/text/FieldSplitProcessor.java | 8 +---
.../text/MultiBytesFieldSplitProcessor.java | 45 ++++++++++++++++++++
.../tajo/storage/text/TextLineDeserializer.java | 6 +--
.../apache/tajo/storage/text/TextLineSerDe.java | 3 +-
.../apache/tajo/storage/TestSplitProcessor.java | 38 +++++++++++++++--
18 files changed, 162 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d7222f..c3f2691 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1381: Support multi-bytes delimiter for Text file.
+ (Contributed by navis, Committed by jinho)
+
TAJO-1391: RpcConnectionPool should check reference counter of connection
before close. (Contributed by navis, Committed by jihun)
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index dd93dd1..f7b1382 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -659,4 +659,28 @@ public class TestSelectQuery extends QueryTestCaseBase {
executeString("DROP TABLE table2");
}
}
+
+ @Test
+ public void testMultiBytesDelimiter3() throws Exception {
+ executeDDL("multibytes_delimiter_table3_ddl.sql", "multibytes_delimiter1");
+ try {
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ } finally {
+ executeString("DROP TABLE table1");
+ }
+ }
+
+ @Test
+ public void testMultiBytesDelimiter4() throws Exception {
+ executeDDL("multibytes_delimiter_table4_ddl.sql", "multibytes_delimiter2");
+ try {
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ } finally {
+ executeString("DROP TABLE table2");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql
new file mode 100644
index 0000000..8309d11
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql
@@ -0,0 +1,3 @@
+create external table table1 (id int, name text, score float, type text) using text
+with ('text.delimiter'='||', 'text.null'='NULL') location ${table.path};
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql
new file mode 100644
index 0000000..2fb821a
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql
@@ -0,0 +1,3 @@
+create external table table2 (id int, name text, score float, type text) using text
+with ('text.delimiter'='ㅎ', 'text.null'='NULL') location ${table.path};
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql
new file mode 100644
index 0000000..bd6b02d
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql
@@ -0,0 +1 @@
+select * from table1;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql
new file mode 100644
index 0000000..66a69ec
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql
@@ -0,0 +1 @@
+select * from table2;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result
new file mode 100644
index 0000000..d8d43b1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result
@@ -0,0 +1,7 @@
+id,name,score,type
+-------------------------------
+1,ooo,1.1,a
+2,ppp,2.3,
+3,qqq,null,
+4,,4.5,
+null,xxx,5.6,e
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result
new file mode 100644
index 0000000..d8d43b1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result
@@ -0,0 +1,7 @@
+id,name,score,type
+-------------------------------
+1,ooo,1.1,a
+2,ppp,2.3,
+3,qqq,null,
+4,,4.5,
+null,xxx,5.6,e
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index bb628b1..c1047d9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -40,6 +40,7 @@ import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.BytesUtils;
import java.io.*;
@@ -83,7 +84,8 @@ public class CSVFile {
this.meta = meta;
this.schema = schema;
this.delimiter = StringEscapeUtils.unescapeJava(
- this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes();
+ this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))
+ .getBytes(Bytes.UTF8_CHARSET);
this.columnNum = schema.size();
@@ -93,7 +95,7 @@ public class CSVFile {
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
} else {
- nullChars = nullCharacters.getBytes();
+ nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
}
}
@@ -265,7 +267,8 @@ public class CSVFile {
//Delimiter
this.delimiter = StringEscapeUtils.unescapeJava(
meta.getOption(StorageConstants.TEXT_DELIMITER,
- meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).getBytes();
+ meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)))
+ .getBytes(Bytes.UTF8_CHARSET);
String nullCharacters = StringEscapeUtils.unescapeJava(
meta.getOption(StorageConstants.TEXT_NULL,
@@ -274,7 +277,7 @@ public class CSVFile {
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
} else {
- nullChars = nullCharacters.getBytes();
+ nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 1599f62..6a8c7a9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -19,6 +19,7 @@
package org.apache.tajo.storage.text;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufProcessor;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.datum.Datum;
@@ -28,9 +29,10 @@ import org.apache.tajo.storage.Tuple;
import java.io.IOException;
public class CSVLineDeserializer extends TextLineDeserializer {
- private FieldSplitProcessor processor;
+ private ByteBufProcessor processor;
private FieldSerializerDeserializer fieldSerDer;
private ByteBuf nullChars;
+ private int delimiterCompensation;
public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
super(schema, meta, targetColumnIndexes);
@@ -38,7 +40,13 @@ public class CSVLineDeserializer extends TextLineDeserializer {
@Override
public void init() {
- this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
+ byte[] delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+ if (delimiter.length == 1) {
+ this.processor = new FieldSplitProcessor(delimiter[0]);
+ } else {
+ this.processor = new MultiBytesFieldSplitProcessor(delimiter);
+ }
+ this.delimiterCompensation = delimiter.length - 1;
if (nullChars != null) {
nullChars.release();
@@ -67,7 +75,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
if (end < 0) {
fieldLength = rowLength - start;
} else {
- fieldLength = end - start;
+ fieldLength = end - start - delimiterCompensation;
}
if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
index 2fe7f23..988d5d1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang.StringEscapeUtils;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.Bytes;
public class CSVLineSerDe extends TextLineSerDe {
@Override
@@ -34,8 +35,8 @@ public class CSVLineSerDe extends TextLineSerDe {
return new CSVLineSerializer(schema, meta);
}
- public static char getFieldDelimiter(TableMeta meta) {
+ public static byte[] getFieldDelimiter(TableMeta meta) {
return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
- StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+ StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes(Bytes.UTF8_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
index 53a0ef3..9a2fe37 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -30,8 +30,8 @@ import java.io.OutputStream;
public class CSVLineSerializer extends TextLineSerializer {
private FieldSerializerDeserializer serde;
- private byte [] nullChars;
- private char delimiter;
+ private byte[] nullChars;
+ private byte[] delimiter;
private int columnNum;
public CSVLineSerializer(Schema schema, TableMeta meta) {
@@ -56,8 +56,8 @@ public class CSVLineSerializer extends TextLineSerializer {
writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
if (columnNum - 1 > i) {
- out.write((byte) delimiter);
- writtenBytes += 1;
+ out.write(delimiter);
+ writtenBytes += delimiter.length;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index ebf9608..4c9234e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -391,7 +391,7 @@ public class DelimitedTextFile {
try {
deserializer.deserialize(buf, tuple);
- // if a line is read normaly, it exists this loop.
+ // if a line is read normally, it exists this loop.
break;
} catch (TextLineParsingError tae) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
index a5ac142..862b5ae 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
@@ -21,9 +21,9 @@ package org.apache.tajo.storage.text;
import io.netty.buffer.ByteBufProcessor;
public class FieldSplitProcessor implements ByteBufProcessor {
- private char delimiter; //the ascii separate character
+ private byte delimiter; //the ascii separate character
- public FieldSplitProcessor(char recordDelimiterByte) {
+ public FieldSplitProcessor(byte recordDelimiterByte) {
this.delimiter = recordDelimiterByte;
}
@@ -31,8 +31,4 @@ public class FieldSplitProcessor implements ByteBufProcessor {
public boolean process(byte value) throws Exception {
return delimiter != value;
}
-
- public char getDelimiter() {
- return delimiter;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java
new file mode 100644
index 0000000..b97d7c6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class MultiBytesFieldSplitProcessor implements ByteBufProcessor {
+
+ private int index;
+ private final byte[] delimiter;
+
+ public MultiBytesFieldSplitProcessor(byte[] recordDelimiterByte) {
+ this.delimiter = recordDelimiterByte;
+ }
+
+ @Override
+ public boolean process(byte value) throws Exception {
+ if (delimiter[index] != value) {
+ index = 0;
+ return true;
+ }
+ if (index != delimiter.length - 1) {
+ index++;
+ return true;
+ }
+ index = 0;
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index 7ebfa79..89a7de9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -29,9 +29,9 @@ import java.io.IOException;
* Reads a text line and fills a Tuple with values
*/
public abstract class TextLineDeserializer {
- protected Schema schema;
- protected TableMeta meta;
- protected int [] targetColumnIndexes;
+ protected final Schema schema;
+ protected final TableMeta meta;
+ protected final int[] targetColumnIndexes;
public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
this.schema = schema;
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
index e81e289..1a53bb0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.BufferPool;
import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.Bytes;
/**
* Pluggable Text Line SerDe class
@@ -56,7 +57,7 @@ public abstract class TextLineSerDe {
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
} else {
- nullChars = nullCharacters.getBytes();
+ nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
}
return nullChars;
http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
index 12ea551..2174d62 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
@@ -19,10 +19,12 @@
package org.apache.tajo.storage;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import org.apache.tajo.storage.text.FieldSplitProcessor;
import org.apache.tajo.storage.text.LineSplitProcessor;
+import org.apache.tajo.storage.text.MultiBytesFieldSplitProcessor;
import org.junit.Test;
import java.io.IOException;
@@ -35,17 +37,47 @@ public class TestSplitProcessor {
@Test
public void testFieldSplitProcessor() throws IOException {
- String data = "abc||de";
+ String data = "abc||de|";
final ByteBuf buf = releaseLater(
Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
final int len = buf.readableBytes();
- FieldSplitProcessor processor = new FieldSplitProcessor('|');
+ FieldSplitProcessor processor = new FieldSplitProcessor((byte)'|');
assertEquals(3, buf.forEachByte(0, len, processor));
assertEquals(4, buf.forEachByte(4, len - 4, processor));
- assertEquals(-1, buf.forEachByte(5, len - 5, processor));
+ assertEquals(7, buf.forEachByte(5, len - 5, processor));
+ assertEquals(-1, buf.forEachByte(8, len - 8, processor));
+ }
+
+ @Test
+ public void testMultiCharFieldSplitProcessor1() throws IOException {
+ String data = "abc||||de||";
+ final ByteBuf buf = releaseLater(
+ Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+ final int len = buf.readableBytes();
+ ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("||".getBytes());
+
+ assertEquals(4, buf.forEachByte(0, len, processor));
+ assertEquals(6, buf.forEachByte(5, len - 5, processor));
+ assertEquals(10, buf.forEachByte(7, len - 7, processor));
+ assertEquals(-1, buf.forEachByte(11, len - 11, processor));
+ }
+
+ @Test
+ public void testMultiCharFieldSplitProcessor2() throws IOException {
+ String data = "abcㅎㅎdeㅎ";
+ final ByteBuf buf = releaseLater(
+ Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));
+
+ final int len = buf.readableBytes();
+ ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("ㅎ".getBytes());
+ assertEquals(5, buf.forEachByte(0, len, processor));
+ assertEquals(8, buf.forEachByte(6, len - 6, processor));
+ assertEquals(13, buf.forEachByte(9, len - 9, processor));
+ assertEquals(-1, buf.forEachByte(14, len - 14, processor));
}
@Test