You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/06/25 01:32:26 UTC
[4/6] drill git commit: DRILL-5590: Bugs in CSV field matching,
null columns
DRILL-5590: Bugs in CSV field matching, null columns
Please see the problem and solution descriptions in DRILL-5590.
Also cleaned up some dead code left over from DRILL-5498.
close #855
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dd55b5c4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dd55b5c4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dd55b5c4
Branch: refs/heads/master
Commit: dd55b5c49c8e3207400b99ea616a032bc6172988
Parents: 33682be
Author: Paul Rogers <pr...@maprtech.com>
Authored: Thu Jun 15 22:46:56 2017 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat Jun 24 09:23:22 2017 -0700
----------------------------------------------------------------------
.../compliant/CompliantTextRecordReader.java | 73 ++------------------
.../easy/text/compliant/FieldVarCharOutput.java | 73 +++++++++++---------
.../text/compliant/RepeatedVarCharOutput.java | 5 +-
.../exec/store/easy/text/compliant/TestCsv.java | 22 ++++++
4 files changed, 68 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 4a35c3b..7009584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -17,15 +17,9 @@
*/
package org.apache.drill.exec.store.easy.text.compliant;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.univocity.parsers.common.TextParsingException;
-import io.netty.buffer.DrillBuf;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
-import java.util.Map;
import javax.annotation.Nullable;
@@ -36,16 +30,16 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.CallBack;
-import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.mapred.FileSplit;
import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import org.apache.drill.exec.expr.TypeHelper;
+import com.univocity.parsers.common.TextParsingException;
+
+import io.netty.buffer.DrillBuf;
// New text reader, complies with the RFC 4180 standard for text/csv files
public class CompliantTextRecordReader extends AbstractRecordReader {
@@ -255,63 +249,4 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
logger.warn("Exception while closing stream.", e);
}
}
-
- /**
- * TextRecordReader during its first phase read to extract header should pass its own
- * OutputMutator to avoid reshaping query output.
- * This class provides OutputMutator for header extraction.
- */
- private class HeaderOutputMutator implements OutputMutator {
- private final Map<String, ValueVector> fieldVectorMap = Maps.newHashMap();
-
- @SuppressWarnings("resource")
- @Override
- public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
- ValueVector v = fieldVectorMap.get(field);
- if (v == null || v.getClass() != clazz) {
- // Field does not exist add it to the map
- v = TypeHelper.getNewVector(field, oContext.getAllocator());
- if (!clazz.isAssignableFrom(v.getClass())) {
- throw new SchemaChangeException(String.format(
- "Class %s was provided, expected %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
- }
- fieldVectorMap.put(field.getPath(), v);
- }
- return clazz.cast(v);
- }
-
- @Override
- public void allocate(int recordCount) {
- //do nothing for now
- }
-
- @Override
- public boolean isNewSchema() {
- return false;
- }
-
- @Override
- public DrillBuf getManagedBuffer() {
- return null;
- }
-
- @Override
- public CallBack getCallBack() {
- return null;
- }
-
- /**
- * Since this OutputMutator is passed by TextRecordReader to get the header out
- * the mutator might not get cleaned up elsewhere. TextRecordReader will call
- * this method to clear any allocations
- */
- public void close() {
- for (final ValueVector v : fieldVectorMap.values()) {
- v.clear();
- }
- fieldVectorMap.clear();
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
index 494c593..b8343d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
@@ -17,8 +17,16 @@
*/
package org.apache.drill.exec.store.easy.text.compliant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -26,12 +34,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.VarCharVector;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Arrays;
-import java.util.List;
-
/**
* Class is responsible for generating record batches for text file inputs. We generate
* a record batch with a set of varchar vectors. A varchar vector contains all the field
@@ -61,8 +63,9 @@ class FieldVarCharOutput extends TextOutput {
private boolean rowHasData= false;
private static final int MAX_FIELD_LENGTH = 1024 * 64;
private int recordCount = 0;
- private int batchIndex = 0;
private int maxField = 0;
+ private int[] nullCols;
+ private byte nullValue[] = new byte[0];
/**
* We initialize and add the varchar vector for each incoming field in this
@@ -77,6 +80,7 @@ class FieldVarCharOutput extends TextOutput {
int totalFields = fieldNames.length;
List<String> outputColumns = new ArrayList<>(Arrays.asList(fieldNames));
+ List<Integer> nullColumns = new ArrayList<>();
if (isStarQuery) {
maxField = totalFields - 1;
@@ -84,11 +88,14 @@ class FieldVarCharOutput extends TextOutput {
Arrays.fill(selectedFields, true);
} else {
List<Integer> columnIds = new ArrayList<Integer>();
- String pathStr;
- int index;
+ Map<String, Integer> headers = CaseInsensitiveMap.newHashMap();
+ for (int i = 0; i < fieldNames.length; i++) {
+ headers.put(fieldNames[i], i);
+ }
for (SchemaPath path : columns) {
- pathStr = path.getRootSegment().getPath();
+ int index;
+ String pathStr = path.getRootSegment().getPath();
if (pathStr.equals(COL_NAME) && path.getRootSegment().getChild() != null) {
//TODO: support both field names and columns index along with predicate pushdown
throw UserException
@@ -98,12 +105,15 @@ class FieldVarCharOutput extends TextOutput {
.addContext("column index", path.getRootSegment().getChild())
.build(logger);
} else {
- index = outputColumns.indexOf(pathStr);
- if (index < 0) {
+ Integer value = headers.get(pathStr);
+ if (value == null) {
// found col that is not a part of fieldNames, add it
// this col might be part of some another scanner
index = totalFields++;
outputColumns.add(pathStr);
+ nullColumns.add(index);
+ } else {
+ index = value;
}
}
columnIds.add(index);
@@ -128,6 +138,12 @@ class FieldVarCharOutput extends TextOutput {
this.fieldBytes = new byte[MAX_FIELD_LENGTH];
+ // Keep track of the null columns to be filled in.
+
+ nullCols = new int[nullColumns.size()];
+ for (int i = 0; i < nullCols.length; i++) {
+ nullCols[i] = nullColumns.get(i);
+ }
}
/**
@@ -135,11 +151,10 @@ class FieldVarCharOutput extends TextOutput {
*/
@Override
public void startBatch() {
- this.recordCount = 0;
- this.batchIndex = 0;
- this.currentFieldIndex= -1;
- this.collect = true;
- this.fieldOpen = false;
+ recordCount = 0;
+ currentFieldIndex= -1;
+ collect = true;
+ fieldOpen = false;
}
@Override
@@ -173,7 +188,7 @@ class FieldVarCharOutput extends TextOutput {
public boolean endField() {
fieldOpen = false;
- if(collect) {
+ if (collect) {
assert currentVector != null;
currentVector.getMutator().setSafe(recordCount, fieldBytes, 0, currentDataPointer);
}
@@ -192,25 +207,20 @@ class FieldVarCharOutput extends TextOutput {
@Override
public void finishRecord() {
- if(fieldOpen){
+ if (fieldOpen){
endField();
}
+ // Fill in null (really empty) values.
+
+ for (int i = 0; i < nullCols.length; i++) {
+ vectors[nullCols[i]].getMutator().setSafe(recordCount, nullValue, 0, 0);
+ }
recordCount++;
}
- // Sets the record count in this batch within the value vector
@Override
- public void finishBatch() {
- batchIndex++;
-
- for (int i = 0; i <= maxField; i++) {
- if (this.vectors[i] != null) {
- this.vectors[i].getMutator().setValueCount(batchIndex);
- }
- }
-
- }
+ public void finishBatch() { }
@Override
public long getRecordCount() {
@@ -221,5 +231,4 @@ class FieldVarCharOutput extends TextOutput {
public boolean rowHasData() {
return this.rowHasData;
}
-
- }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
index eda2feb..156d6c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
@@ -345,9 +345,6 @@ class RepeatedVarCharOutput extends TextOutput {
return out;
}
- // Sets the record count in this batch within the value vector
@Override
- public void finishBatch() {
- mutator.setValueCount(batchIndex);
- }
+ public void finishBatch() { }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
index 7d38cf9..c18adc9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
@@ -135,6 +135,28 @@ public class TestCsv extends ClusterTest {
.verifyAndClear(actual);
}
+ // Test fix for DRILL-5590
+ @Test
+ public void testCsvHeadersCaseInsensitive() throws IOException {
+ String fileName = "case2.csv";
+ buildFile(fileName, validHeaders);
+ String sql = "SELECT A, b, C FROM `dfs.data`.`" + fileName + "`";
+ RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .build();
+ assertEquals(expectedSchema, actual.batchSchema());
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .add("10", "foo", "bar")
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClear(actual);
+ }
+
private String makeStatement(String fileName) {
return "SELECT * FROM `dfs.data`.`" + fileName + "`";
}