You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/06/19 00:03:43 UTC
[3/6] drill git commit: DRILL-3149: TextReader should support
multibyte line delimiters
DRILL-3149: TextReader should support multibyte line delimiters
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/223507b7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/223507b7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/223507b7
Branch: refs/heads/master
Commit: 223507b76ff6c2227e667ae4a53f743c92edd295
Parents: f86c4fa
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Mon Apr 25 19:15:02 2016 +0300
Committer: Parth Chandra <pa...@apache.org>
Committed: Sat Jun 18 17:02:59 2016 -0700
----------------------------------------------------------------------
.../dfs/FormatPluginOptionsDescriptor.java | 5 +
.../store/easy/text/compliant/TextInput.java | 105 +++++++++----------
.../text/compliant/TextParsingSettings.java | 3 -
.../org/apache/drill/TestSelectWithOption.java | 74 +++++++++++--
4 files changed, 115 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
index 34a20e8..d3b2d5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableInstance;
@@ -150,6 +151,10 @@ final class FormatPluginOptionsDescriptor {
// when null is passed, we leave the default defined in the config class
continue;
}
+ if (param instanceof String) {
+ // normalize Java literals, ex: \t, \n, \r
+ param = StringEscapeUtils.unescapeJava((String) param);
+ }
TableParamDef paramDef = t.sig.params.get(i);
TableParamDef expectedParamDef = this.functionParamsByName.get(paramDef.name);
if (expectedParamDef == null || expectedParamDef.type != paramDef.type) {
http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
index 513476f..d8b1672 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
@@ -56,9 +56,7 @@ import com.univocity.parsers.common.Format;
*/
final class TextInput {
- private static final byte NULL_BYTE = (byte) '\0';
- private final byte lineSeparator1;
- private final byte lineSeparator2;
+ private final byte[] lineSeparator;
private final byte normalizedLineSeparator;
private final TextParsingSettings settings;
@@ -91,7 +89,7 @@ final class TextInput {
* Whether there was a possible partial line separator on the previous
* read so we dropped it and it should be appended to next read.
*/
- private boolean remByte = false;
+ private int remByte = -1;
/**
* The current position in the buffer.
@@ -107,13 +105,12 @@ final class TextInput {
/**
* Creates a new instance with the mandatory characters for handling newlines transparently.
- * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
- * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
+ * lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
+ * normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
*/
public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
- byte[] lineSeparator = settings.getNewLineDelimiter();
+ this.lineSeparator = settings.getNewLineDelimiter();
byte normalizedLineSeparator = settings.getNormalizedNewLine();
- Preconditions.checkArgument(lineSeparator != null && (lineSeparator.length == 1 || lineSeparator.length == 2), "Invalid line separator. Expected 1 to 2 characters");
Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
boolean isCompressed = input instanceof CompressionInputStream ;
Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
@@ -138,8 +135,6 @@ final class TextInput {
this.startPos = startPos;
this.endPos = endPos;
- this.lineSeparator1 = lineSeparator[0];
- this.lineSeparator2 = lineSeparator.length == 2 ? lineSeparator[1] : NULL_BYTE;
this.normalizedLineSeparator = normalizedLineSeparator;
this.buffer = readBuffer;
@@ -196,23 +191,25 @@ final class TextInput {
private final void read() throws IOException {
if(bufferReadable){
- if(remByte){
- underlyingBuffer.put(lineSeparator1);
- remByte = false;
+ if(remByte != -1){
+ for (int i = 0; i <= remByte; i++) {
+ underlyingBuffer.put(lineSeparator[i]);
+ }
+ remByte = -1;
}
length = inputFS.read(underlyingBuffer);
}else{
byte[] b = new byte[underlyingBuffer.capacity()];
- if(remByte){
- b[0] = lineSeparator1;
- length = input.read(b, 1, b.length - 1);
- remByte = false;
+ if(remByte != -1){
+ int remBytesNum = remByte + 1;
+ System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
+ length = input.read(b, remBytesNum, b.length - remBytesNum);
+ remByte = -1;
}else{
length = input.read(b);
}
-
underlyingBuffer.put(b);
}
}
@@ -251,46 +248,31 @@ final class TextInput {
* adjusts so that we can only read to the last character of the first line that crosses
* the split boundary.
*/
- private void updateLengthBasedOnConstraint(){
- // we've run over our alotted data.
- final byte lineSeparator1 = this.lineSeparator1;
- final byte lineSeparator2 = this.lineSeparator2;
-
+ private void updateLengthBasedOnConstraint() {
// find the next line separator:
final long max = bStart + length;
- for(long m = this.bStart + (endPos - streamPos); m < max; m++){
- if(PlatformDependent.getByte(m) == lineSeparator1){
- // we found a potential line break.
-
- if(lineSeparator2 == NULL_BYTE){
- // we found a line separator and don't need to consult the next byte.
- length = (int)(m - bStart) + 1; // make sure we include line separator otherwise query may fail (DRILL-4317)
- endFound = true;
- break;
- }else{
- // this is a two byte line separator.
-
- long mPlus = m+1;
- if(mPlus < max){
- // we can check next byte and see if the second lineSeparator is correct.
- if(lineSeparator2 == PlatformDependent.getByte(mPlus)){
- length = (int)(mPlus - bStart);
- endFound = true;
- break;
- }else{
- // this was a partial line break.
- continue;
- }
- }else{
- // the last character of the read was a remnant byte. We'll hold off on dealing with this byte until the next read.
- remByte = true;
- length -= 1;
- break;
+ for (long m = this.bStart + (endPos - streamPos); m < max; m++) {
+ long mTemp = m - 1;
+ for (int i = 0; i < lineSeparator.length; i++) {
+ mTemp++;
+ if (PlatformDependent.getByte(mTemp) == lineSeparator[i]) {
+ if (mTemp < max) {
+ continue;
+ } else {
+ // remnant bytes
+ // the last N characters of the read were a remnant bytes. We'll hold off on dealing with these bytes until the next read.
+ remByte = i;
+ length -= (i + 1);
+ return;
}
-
}
+ break;
}
+ // we found line delimiter
+ length = (int) (mTemp - bStart);
+ endFound = true;
+ break;
}
}
@@ -301,8 +283,6 @@ final class TextInput {
* @throws IOException
*/
public final byte nextChar() throws IOException {
- final byte lineSeparator1 = this.lineSeparator1;
- final byte lineSeparator2 = this.lineSeparator2;
if (length == -1) {
throw StreamFinishedPseudoException.INSTANCE;
@@ -326,12 +306,20 @@ final class TextInput {
bufferPtr++;
// monitor for next line.
- if (lineSeparator1 == byteChar && (lineSeparator2 == NULL_BYTE || lineSeparator2 == buffer.getByte(bufferPtr - 1))) {
- lineCount++;
+ int bufferPtrTemp = bufferPtr - 1;
+ if (byteChar == lineSeparator[0]) {
+ for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
+ if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) {
+ return byteChar;
+ }
+ }
- if (lineSeparator2 != NULL_BYTE) {
- byteChar = normalizedLineSeparator;
+ lineCount++;
+ byteChar = normalizedLineSeparator;
+ // we don't need to update buffer position if line separator is one byte long
+ if (lineSeparator.length > 1) {
+ bufferPtr += (lineSeparator.length - 1);
if (bufferPtr >= length) {
if (length != -1) {
updateBuffer();
@@ -341,6 +329,7 @@ final class TextInput {
}
}
}
+
return byteChar;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
index a366c90..41bb33d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.store.easy.text.compliant;
import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
import com.univocity.parsers.common.TextParsingException;
public class TextParsingSettings {
@@ -51,8 +50,6 @@ public class TextParsingSettings {
this.quote = bSafe(config.getQuote(), "quote");
this.quoteEscape = bSafe(config.getEscape(), "escape");
this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
- Preconditions.checkArgument(newLineDelimiter.length == 1 || newLineDelimiter.length == 2,
- String.format("Line delimiter must be 1 or 2 bytes in length. The provided delimiter was %d bytes long.", newLineDelimiter.length));
this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
this.comment = bSafe(config.getComment(), "comment");
this.skipFirstLine = config.isSkipFirstLine();
http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index c74480b..111313b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -25,7 +25,6 @@ import java.io.FileWriter;
import java.io.IOException;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
-import org.junit.Ignore;
import org.junit.Test;
public class TestSelectWithOption extends BaseTestQuery {
@@ -78,19 +77,72 @@ public class TestSelectWithOption extends BaseTestQuery {
);
}
- @Test @Ignore // It does not look like lineDelimiter is working
- public void testTextLineDelimiter() throws Exception {
+ @Test
+ public void testTabFieldDelimiter() throws Exception {
+ String tableName = genCSVTable("testTabFieldDelimiter",
+ "1\ta",
+ "2\tb");
+ String fieldDelimiter = new String(new char[]{92, 116}); // represents \t
+ testWithResult(format("select columns from table(%s(type=>'TeXT', fieldDelimiter => '%s'))", tableName, fieldDelimiter),
+ listOf("1", "a"),
+ listOf("2", "b"));
+ }
+
+ @Test
+ public void testSingleTextLineDelimiter() throws Exception {
+ String tableName = genCSVTable("testSingleTextLineDelimiter",
+ "a|b|c");
+
+ testWithResult(format("select columns from table(%s(type => 'TeXT', lineDelimiter => '|'))", tableName),
+ listOf("a"),
+ listOf("b"),
+ listOf("c"));
+ }
+
+ @Test
+ // '\n' is treated as standard delimiter
+ // if user has indicated custom line delimiter but input file contains '\n', split will occur on both
+ public void testCustomTextLineDelimiterAndNewLine() throws Exception {
String tableName = genCSVTable("testTextLineDelimiter",
- "\"b\"|\"0\"",
- "\"b\"|\"1\"",
- "\"b\"|\"2\"");
+ "b|1",
+ "b|2");
testWithResult(format("select columns from table(%s(type => 'TeXT', lineDelimiter => '|'))", tableName),
- listOf("\"b\""),
- listOf("\"0\"", "\"b\""),
- listOf("\"1\"", "\"b\""),
- listOf("\"2\"")
- );
+ listOf("b"),
+ listOf("1"),
+ listOf("b"),
+ listOf("2"));
+ }
+
+ @Test
+ public void testTextLineDelimiterWithCarriageReturn() throws Exception {
+ String tableName = genCSVTable("testTextLineDelimiterWithCarriageReturn",
+ "1, a\r",
+ "2, b\r");
+ String lineDelimiter = new String(new char[]{92, 114, 92, 110}); // represents \r\n
+ testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => '%s'))", tableName, lineDelimiter),
+ listOf("1, a"),
+ listOf("2, b"));
+ }
+
+ @Test
+ public void testMultiByteLineDelimiter() throws Exception {
+ String tableName = genCSVTable("testMultiByteLineDelimiter",
+ "1abc2abc3abc");
+ testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => 'abc'))", tableName),
+ listOf("1"),
+ listOf("2"),
+ listOf("3"));
+ }
+
+ @Test
+ public void testDataWithPartOfMultiByteLineDelimiter() throws Exception {
+ String tableName = genCSVTable("testDataWithPartOfMultiByteLineDelimiter",
+ "ab1abc2abc3abc");
+ testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => 'abc'))", tableName),
+ listOf("ab1"),
+ listOf("2"),
+ listOf("3"));
}
@Test