You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/06/19 00:03:43 UTC

[3/6] drill git commit: DRILL-3149: TextReader should support multibyte line delimiters

DRILL-3149: TextReader should support multibyte line delimiters


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/223507b7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/223507b7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/223507b7

Branch: refs/heads/master
Commit: 223507b76ff6c2227e667ae4a53f743c92edd295
Parents: f86c4fa
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Mon Apr 25 19:15:02 2016 +0300
Committer: Parth Chandra <pa...@apache.org>
Committed: Sat Jun 18 17:02:59 2016 -0700

----------------------------------------------------------------------
 .../dfs/FormatPluginOptionsDescriptor.java      |   5 +
 .../store/easy/text/compliant/TextInput.java    | 105 +++++++++----------
 .../text/compliant/TextParsingSettings.java     |   3 -
 .../org/apache/drill/TestSelectWithOption.java  |  74 +++++++++++--
 4 files changed, 115 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
index 34a20e8..d3b2d5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableInstance;
@@ -150,6 +151,10 @@ final class FormatPluginOptionsDescriptor {
         // when null is passed, we leave the default defined in the config class
         continue;
       }
+      if (param instanceof String) {
+        // normalize Java literals, ex: \t, \n, \r
+        param = StringEscapeUtils.unescapeJava((String) param);
+      }
       TableParamDef paramDef = t.sig.params.get(i);
       TableParamDef expectedParamDef = this.functionParamsByName.get(paramDef.name);
       if (expectedParamDef == null || expectedParamDef.type != paramDef.type) {

http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
index 513476f..d8b1672 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
@@ -56,9 +56,7 @@ import com.univocity.parsers.common.Format;
  */
 final class TextInput {
 
-  private static final byte NULL_BYTE = (byte) '\0';
-  private final byte lineSeparator1;
-  private final byte lineSeparator2;
+  private final byte[] lineSeparator;
   private final byte normalizedLineSeparator;
   private final TextParsingSettings settings;
 
@@ -91,7 +89,7 @@ final class TextInput {
    * Whether there was a possible partial line separator on the previous
    * read so we dropped it and it should be appended to next read.
    */
-  private boolean remByte = false;
+  private int remByte = -1;
 
   /**
    * The current position in the buffer.
@@ -107,13 +105,12 @@ final class TextInput {
 
   /**
    * Creates a new instance with the mandatory characters for handling newlines transparently.
-   * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
-   * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
+   * lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
+   * normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
    */
   public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
-    byte[] lineSeparator = settings.getNewLineDelimiter();
+    this.lineSeparator = settings.getNewLineDelimiter();
     byte normalizedLineSeparator = settings.getNormalizedNewLine();
-    Preconditions.checkArgument(lineSeparator != null && (lineSeparator.length == 1 || lineSeparator.length == 2), "Invalid line separator. Expected 1 to 2 characters");
     Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
     boolean isCompressed = input instanceof CompressionInputStream ;
     Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
@@ -138,8 +135,6 @@ final class TextInput {
     this.startPos = startPos;
     this.endPos = endPos;
 
-    this.lineSeparator1 = lineSeparator[0];
-    this.lineSeparator2 = lineSeparator.length == 2 ? lineSeparator[1] : NULL_BYTE;
     this.normalizedLineSeparator = normalizedLineSeparator;
 
     this.buffer = readBuffer;
@@ -196,23 +191,25 @@ final class TextInput {
   private final void read() throws IOException {
     if(bufferReadable){
 
-      if(remByte){
-        underlyingBuffer.put(lineSeparator1);
-        remByte = false;
+      if(remByte != -1){
+        for (int i = 0; i <= remByte; i++) {
+          underlyingBuffer.put(lineSeparator[i]);
+        }
+        remByte = -1;
       }
       length = inputFS.read(underlyingBuffer);
 
     }else{
 
       byte[] b = new byte[underlyingBuffer.capacity()];
-      if(remByte){
-        b[0] = lineSeparator1;
-        length = input.read(b, 1, b.length - 1);
-        remByte = false;
+      if(remByte != -1){
+        int remBytesNum = remByte + 1;
+        System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
+        length = input.read(b, remBytesNum, b.length - remBytesNum);
+        remByte = -1;
       }else{
         length = input.read(b);
       }
-
       underlyingBuffer.put(b);
     }
   }
@@ -251,46 +248,31 @@ final class TextInput {
    * adjusts so that we can only read to the last character of the first line that crosses
    * the split boundary.
    */
-  private void updateLengthBasedOnConstraint(){
-    // we've run over our alotted data.
-    final byte lineSeparator1 = this.lineSeparator1;
-    final byte lineSeparator2 = this.lineSeparator2;
-
+  private void updateLengthBasedOnConstraint() {
     // find the next line separator:
     final long max = bStart + length;
 
-    for(long m = this.bStart + (endPos - streamPos); m < max; m++){
-      if(PlatformDependent.getByte(m) == lineSeparator1){
-        // we found a potential line break.
-
-        if(lineSeparator2 == NULL_BYTE){
-          // we found a line separator and don't need to consult the next byte.
-          length = (int)(m - bStart) + 1; // make sure we include line separator otherwise query may fail (DRILL-4317)
-          endFound = true;
-          break;
-        }else{
-          // this is a two byte line separator.
-
-          long mPlus = m+1;
-          if(mPlus < max){
-            // we can check next byte and see if the second lineSeparator is correct.
-            if(lineSeparator2 == PlatformDependent.getByte(mPlus)){
-              length = (int)(mPlus - bStart);
-              endFound = true;
-              break;
-            }else{
-              // this was a partial line break.
-              continue;
-            }
-          }else{
-            // the last character of the read was a remnant byte.  We'll hold off on dealing with this byte until the next read.
-            remByte = true;
-            length -= 1;
-            break;
+    for (long m = this.bStart + (endPos - streamPos); m < max; m++) {
+      long mTemp = m - 1;
+      for (int i = 0; i < lineSeparator.length; i++) {
+        mTemp++;
+        if (PlatformDependent.getByte(mTemp) == lineSeparator[i]) {
+          if (mTemp < max) {
+            continue;
+          } else {
+            // remnant bytes
+            // the last N characters of the read were a remnant bytes. We'll hold off on dealing with these bytes until the next read.
+            remByte = i;
+            length -= (i + 1);
+            return;
           }
-
         }
+        break;
       }
+      // we found line delimiter
+      length = (int) (mTemp - bStart);
+      endFound = true;
+      break;
     }
   }
 
@@ -301,8 +283,6 @@ final class TextInput {
    * @throws IOException
    */
   public final byte nextChar() throws IOException {
-    final byte lineSeparator1 = this.lineSeparator1;
-    final byte lineSeparator2 = this.lineSeparator2;
 
     if (length == -1) {
       throw StreamFinishedPseudoException.INSTANCE;
@@ -326,12 +306,20 @@ final class TextInput {
     bufferPtr++;
 
     // monitor for next line.
-    if (lineSeparator1 == byteChar && (lineSeparator2 == NULL_BYTE || lineSeparator2 == buffer.getByte(bufferPtr - 1))) {
-      lineCount++;
+    int bufferPtrTemp = bufferPtr - 1;
+    if (byteChar == lineSeparator[0]) {
+      for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
+        if (lineSeparator[i] !=  buffer.getByte(bufferPtrTemp)) {
+          return byteChar;
+        }
+      }
 
-      if (lineSeparator2 != NULL_BYTE) {
-        byteChar = normalizedLineSeparator;
+      lineCount++;
+      byteChar = normalizedLineSeparator;
 
+      // we don't need to update buffer position if line separator is one byte long
+      if (lineSeparator.length > 1) {
+        bufferPtr += (lineSeparator.length - 1);
         if (bufferPtr >= length) {
           if (length != -1) {
             updateBuffer();
@@ -341,6 +329,7 @@ final class TextInput {
         }
       }
     }
+
     return byteChar;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
index a366c90..41bb33d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.store.easy.text.compliant;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
 import com.univocity.parsers.common.TextParsingException;
 
 public class TextParsingSettings {
@@ -51,8 +50,6 @@ public class TextParsingSettings {
     this.quote = bSafe(config.getQuote(), "quote");
     this.quoteEscape = bSafe(config.getEscape(), "escape");
     this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
-    Preconditions.checkArgument(newLineDelimiter.length == 1 || newLineDelimiter.length == 2,
-        String.format("Line delimiter must be 1 or 2 bytes in length.  The provided delimiter was %d bytes long.", newLineDelimiter.length));
     this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
     this.comment = bSafe(config.getComment(), "comment");
     this.skipFirstLine = config.isSkipFirstLine();

http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index c74480b..111313b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -25,7 +25,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestSelectWithOption extends BaseTestQuery {
@@ -78,19 +77,72 @@ public class TestSelectWithOption extends BaseTestQuery {
       );
   }
 
-  @Test @Ignore // It does not look like lineDelimiter is working
-  public void testTextLineDelimiter() throws Exception {
+  @Test
+  public void testTabFieldDelimiter() throws Exception {
+    String tableName = genCSVTable("testTabFieldDelimiter",
+        "1\ta",
+        "2\tb");
+    String fieldDelimiter = new String(new char[]{92, 116}); // represents \t
+    testWithResult(format("select columns from table(%s(type=>'TeXT', fieldDelimiter => '%s'))", tableName, fieldDelimiter),
+        listOf("1", "a"),
+        listOf("2", "b"));
+  }
+
+  @Test
+  public void testSingleTextLineDelimiter() throws Exception {
+    String tableName = genCSVTable("testSingleTextLineDelimiter",
+        "a|b|c");
+
+    testWithResult(format("select columns from table(%s(type => 'TeXT', lineDelimiter => '|'))", tableName),
+        listOf("a"),
+        listOf("b"),
+        listOf("c"));
+  }
+
+  @Test
+  // '\n' is treated as standard delimiter
+  // if user has indicated custom line delimiter but input file contains '\n', split will occur on both
+  public void testCustomTextLineDelimiterAndNewLine() throws Exception {
     String tableName = genCSVTable("testTextLineDelimiter",
-        "\"b\"|\"0\"",
-        "\"b\"|\"1\"",
-        "\"b\"|\"2\"");
+        "b|1",
+        "b|2");
 
     testWithResult(format("select columns from table(%s(type => 'TeXT', lineDelimiter => '|'))", tableName),
-        listOf("\"b\""),
-        listOf("\"0\"", "\"b\""),
-        listOf("\"1\"", "\"b\""),
-        listOf("\"2\"")
-      );
+        listOf("b"),
+        listOf("1"),
+        listOf("b"),
+        listOf("2"));
+  }
+
+  @Test
+  public void testTextLineDelimiterWithCarriageReturn() throws Exception {
+    String tableName = genCSVTable("testTextLineDelimiterWithCarriageReturn",
+        "1, a\r",
+        "2, b\r");
+    String lineDelimiter = new String(new char[]{92, 114, 92, 110}); // represents \r\n
+    testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => '%s'))", tableName, lineDelimiter),
+        listOf("1, a"),
+        listOf("2, b"));
+  }
+
+  @Test
+  public void testMultiByteLineDelimiter() throws Exception {
+    String tableName = genCSVTable("testMultiByteLineDelimiter",
+        "1abc2abc3abc");
+    testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => 'abc'))", tableName),
+        listOf("1"),
+        listOf("2"),
+        listOf("3"));
+  }
+
+  @Test
+  public void testDataWithPartOfMultiByteLineDelimiter() throws Exception {
+    String tableName = genCSVTable("testDataWithPartOfMultiByteLineDelimiter",
+        "ab1abc2abc3abc");
+    testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => 'abc'))", tableName),
+        listOf("ab1"),
+        listOf("2"),
+        listOf("3"));
   }
 
   @Test