You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/06/25 01:32:26 UTC

[4/6] drill git commit: DRILL-5590: Bugs in CSV field matching, null columns

DRILL-5590: Bugs in CSV field matching, null columns

Please see the problem and solution descriptions in DRILL-5590.

Also cleaned up some dead code left over from DRILL-5498.

close #855


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dd55b5c4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dd55b5c4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dd55b5c4

Branch: refs/heads/master
Commit: dd55b5c49c8e3207400b99ea616a032bc6172988
Parents: 33682be
Author: Paul Rogers <pr...@maprtech.com>
Authored: Thu Jun 15 22:46:56 2017 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat Jun 24 09:23:22 2017 -0700

----------------------------------------------------------------------
 .../compliant/CompliantTextRecordReader.java    | 73 ++------------------
 .../easy/text/compliant/FieldVarCharOutput.java | 73 +++++++++++---------
 .../text/compliant/RepeatedVarCharOutput.java   |  5 +-
 .../exec/store/easy/text/compliant/TestCsv.java | 22 ++++++
 4 files changed, 68 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 4a35c3b..7009584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -17,15 +17,9 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.univocity.parsers.common.TextParsingException;
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
-import java.util.Map;
 
 import javax.annotation.Nullable;
 
@@ -36,16 +30,16 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.CallBack;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.mapred.FileSplit;
 
 import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import org.apache.drill.exec.expr.TypeHelper;
+import com.univocity.parsers.common.TextParsingException;
+
+import io.netty.buffer.DrillBuf;
 
 // New text reader, complies with the RFC 4180 standard for text/csv files
 public class CompliantTextRecordReader extends AbstractRecordReader {
@@ -255,63 +249,4 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
       logger.warn("Exception while closing stream.", e);
     }
   }
-
-  /**
-   * TextRecordReader during its first phase read to extract header should pass its own
-   * OutputMutator to avoid reshaping query output.
-   * This class provides OutputMutator for header extraction.
-   */
-  private class HeaderOutputMutator implements OutputMutator {
-    private final Map<String, ValueVector> fieldVectorMap = Maps.newHashMap();
-
-    @SuppressWarnings("resource")
-    @Override
-    public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
-      ValueVector v = fieldVectorMap.get(field);
-      if (v == null || v.getClass() != clazz) {
-        // Field does not exist add it to the map
-        v = TypeHelper.getNewVector(field, oContext.getAllocator());
-        if (!clazz.isAssignableFrom(v.getClass())) {
-          throw new SchemaChangeException(String.format(
-              "Class %s was provided, expected %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
-        }
-        fieldVectorMap.put(field.getPath(), v);
-      }
-      return clazz.cast(v);
-    }
-
-    @Override
-    public void allocate(int recordCount) {
-      //do nothing for now
-    }
-
-    @Override
-    public boolean isNewSchema() {
-      return false;
-    }
-
-    @Override
-    public DrillBuf getManagedBuffer() {
-      return null;
-    }
-
-    @Override
-    public CallBack getCallBack() {
-      return null;
-    }
-
-    /**
-     * Since this OutputMutator is passed by TextRecordReader to get the header out
-     * the mutator might not get cleaned up elsewhere. TextRecordReader will call
-     * this method to clear any allocations
-     */
-    public void close() {
-      for (final ValueVector v : fieldVectorMap.values()) {
-        v.clear();
-      }
-      fieldVectorMap.clear();
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
index 494c593..b8343d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
@@ -17,8 +17,16 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -26,12 +34,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.VarCharVector;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * Class is responsible for generating record batches for text file inputs. We generate
  * a record batch with a set of varchar vectors. A varchar vector contains all the field
@@ -61,8 +63,9 @@ class FieldVarCharOutput extends TextOutput {
   private boolean rowHasData= false;
   private static final int MAX_FIELD_LENGTH = 1024 * 64;
   private int recordCount = 0;
-  private int batchIndex = 0;
   private int maxField = 0;
+  private int[] nullCols;
+  private byte nullValue[] = new byte[0];
 
   /**
    * We initialize and add the varchar vector for each incoming field in this
@@ -77,6 +80,7 @@ class FieldVarCharOutput extends TextOutput {
 
     int totalFields = fieldNames.length;
     List<String> outputColumns = new ArrayList<>(Arrays.asList(fieldNames));
+    List<Integer> nullColumns = new ArrayList<>();
 
     if (isStarQuery) {
       maxField = totalFields - 1;
@@ -84,11 +88,14 @@ class FieldVarCharOutput extends TextOutput {
       Arrays.fill(selectedFields, true);
     } else {
       List<Integer> columnIds = new ArrayList<Integer>();
-      String pathStr;
-      int index;
+      Map<String, Integer> headers = CaseInsensitiveMap.newHashMap();
+      for (int i = 0; i < fieldNames.length; i++) {
+        headers.put(fieldNames[i], i);
+      }
 
       for (SchemaPath path : columns) {
-        pathStr = path.getRootSegment().getPath();
+        int index;
+        String pathStr = path.getRootSegment().getPath();
         if (pathStr.equals(COL_NAME) && path.getRootSegment().getChild() != null) {
           //TODO: support both field names and columns index along with predicate pushdown
           throw UserException
@@ -98,12 +105,15 @@ class FieldVarCharOutput extends TextOutput {
               .addContext("column index", path.getRootSegment().getChild())
               .build(logger);
         } else {
-          index = outputColumns.indexOf(pathStr);
-          if (index < 0) {
+          Integer value = headers.get(pathStr);
+          if (value == null) {
             // found col that is not a part of fieldNames, add it
             // this col might be part of some another scanner
             index = totalFields++;
             outputColumns.add(pathStr);
+            nullColumns.add(index);
+          } else {
+            index = value;
           }
         }
         columnIds.add(index);
@@ -128,6 +138,12 @@ class FieldVarCharOutput extends TextOutput {
 
     this.fieldBytes = new byte[MAX_FIELD_LENGTH];
 
+    // Keep track of the null columns to be filled in.
+
+    nullCols = new int[nullColumns.size()];
+    for (int i = 0; i < nullCols.length; i++) {
+      nullCols[i] = nullColumns.get(i);
+    }
   }
 
   /**
@@ -135,11 +151,10 @@ class FieldVarCharOutput extends TextOutput {
    */
   @Override
   public void startBatch() {
-    this.recordCount = 0;
-    this.batchIndex = 0;
-    this.currentFieldIndex= -1;
-    this.collect = true;
-    this.fieldOpen = false;
+    recordCount = 0;
+    currentFieldIndex= -1;
+    collect = true;
+    fieldOpen = false;
   }
 
   @Override
@@ -173,7 +188,7 @@ class FieldVarCharOutput extends TextOutput {
   public boolean endField() {
     fieldOpen = false;
 
-    if(collect) {
+    if (collect) {
       assert currentVector != null;
       currentVector.getMutator().setSafe(recordCount, fieldBytes, 0, currentDataPointer);
     }
@@ -192,25 +207,20 @@ class FieldVarCharOutput extends TextOutput {
 
  @Override
   public void finishRecord() {
-    if(fieldOpen){
+    if (fieldOpen){
       endField();
     }
 
+    // Fill in null (really empty) values.
+
+    for (int i = 0; i < nullCols.length; i++) {
+      vectors[nullCols[i]].getMutator().setSafe(recordCount, nullValue, 0, 0);
+    }
     recordCount++;
   }
 
-  // Sets the record count in this batch within the value vector
   @Override
-  public void finishBatch() {
-    batchIndex++;
-
-    for (int i = 0; i <= maxField; i++) {
-      if (this.vectors[i] != null) {
-        this.vectors[i].getMutator().setValueCount(batchIndex);
-      }
-    }
-
-  }
+  public void finishBatch() { }
 
   @Override
   public long getRecordCount() {
@@ -221,5 +231,4 @@ class FieldVarCharOutput extends TextOutput {
   public boolean rowHasData() {
     return this.rowHasData;
   }
-
- }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
index eda2feb..156d6c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
@@ -345,9 +345,6 @@ class RepeatedVarCharOutput extends TextOutput {
     return out;
   }
 
-  // Sets the record count in this batch within the value vector
   @Override
-  public void finishBatch() {
-    mutator.setValueCount(batchIndex);
-  }
+  public void finishBatch() { }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
index 7d38cf9..c18adc9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
@@ -135,6 +135,28 @@ public class TestCsv extends ClusterTest {
       .verifyAndClear(actual);
   }
 
+  // Test fix for DRILL-5590
+  @Test
+  public void testCsvHeadersCaseInsensitive() throws IOException {
+    String fileName = "case2.csv";
+    buildFile(fileName, validHeaders);
+    String sql = "SELECT A, b, C FROM `dfs.data`.`" + fileName + "`";
+    RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("A", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("C", MinorType.VARCHAR)
+        .build();
+    assertEquals(expectedSchema, actual.batchSchema());
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .add("10", "foo", "bar")
+        .build();
+    new RowSetComparison(expected)
+      .verifyAndClear(actual);
+  }
+
   private String makeStatement(String fileName) {
     return "SELECT * FROM `dfs.data`.`" + fileName + "`";
   }