You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/11/01 01:57:44 UTC
[drill] branch master updated: DRILL-8022: Add Provided Schema
Support for Excel Reader
This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 128bc3d DRILL-8022: Add Provided Schema Support for Excel Reader
128bc3d is described below
commit 128bc3dfd3a716c32fa9ab57cf965fa9c470322b
Author: Charles Givre <cg...@apache.org>
AuthorDate: Sun Oct 31 19:06:22 2021 -0400
DRILL-8022: Add Provided Schema Support for Excel Reader
---
contrib/format-excel/pom.xml | 2 +-
.../drill/exec/store/excel/ExcelBatchReader.java | 127 +++++++++++++++++++--
.../drill/exec/store/excel/TestExcelFormat.java | 108 ++++++++++++++++++
.../test/resources/excel/schema_provisioning.xlsx | Bin 0 -> 9748 bytes
.../java/org/apache/drill/test/QueryTestUtil.java | 13 +++
5 files changed, 240 insertions(+), 10 deletions(-)
diff --git a/contrib/format-excel/pom.xml b/contrib/format-excel/pom.xml
index 5e831bb..91d225e 100644
--- a/contrib/format-excel/pom.xml
+++ b/contrib/format-excel/pom.xml
@@ -67,7 +67,7 @@
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
- <version>3.1.2</version>
+ <version>3.1.6</version>
</dependency>
</dependencies>
<build>
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index d43cdfd..132da1f 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -22,7 +22,6 @@ import com.github.pjfanning.xlsx.StreamingReader;
import com.github.pjfanning.xlsx.impl.StreamingWorkbook;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
@@ -30,9 +29,11 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchem
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.hadoop.mapred.FileSplit;
@@ -193,7 +194,25 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
ResultSetLoader loader = negotiator.build();
rowWriter = loader.writer();
openFile(negotiator);
- defineSchema();
+
+ if (negotiator.hasProvidedSchema()) {
+ TupleMetadata providedSchema = negotiator.providedSchema();
+ logger.debug("Found inline schema");
+
+ // Add Implicit columns to schema
+ SchemaBuilder builderForProvidedSchema = new SchemaBuilder();
+ builderForProvidedSchema.addAll(providedSchema);
+ TupleMetadata finalSchema = builderForProvidedSchema.build();
+ buildColumnWritersFromProvidedSchema(finalSchema);
+
+ // Add schema to file negotiator
+ logger.debug("Metadata added to provided schema.");
+ addMetadataToSchema(builderForProvidedSchema);
+ // Build column writer array
+ negotiator.tableSchema(finalSchema, true);
+ } else {
+ defineSchema(negotiator);
+ }
return true;
}
@@ -231,9 +250,34 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
/**
* This function defines the schema from the header row.
*/
- private void defineSchema() {
+ private void defineSchema(FileSchemaNegotiator negotiator) {
SchemaBuilder builder = new SchemaBuilder();
getColumnHeaders(builder);
+ negotiator.tableSchema(builder.buildSchema(), false);
+ }
+
+ private void buildColumnWritersFromProvidedSchema(TupleMetadata finalSchema) {
+ // Case for empty sheet
+ if (sheet.getLastRowNum() == 0) {
+ return;
+ }
+
+ columnWriters = new ArrayList<>();
+ metadataColumnWriters = new ArrayList<>();
+ cellWriterArray = new ArrayList<>();
+ rowIterator = sheet.iterator();
+
+ // Get the number of columns.
+ // This method also advances the row reader to the location of the first row of data
+ setFirstDataRow();
+ totalColumnCount = finalSchema.size();
+ firstLine = false;
+
+ // Populate column writer array
+ for(MaterializedField field : finalSchema.toFieldList()) {
+ addColumnToArray(rowWriter, field.getName(), field.getType().getMinorType(), isMetadataField(field.getName()));
+ }
+ addMetadataWriters();
}
private void getColumnHeaders(SchemaBuilder builder) {
@@ -262,7 +306,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
for (Cell c : currentRow) {
missingFieldName = MISSING_FIELD_NAME_HEADER + (i + 1);
- makeColumn(builder, missingFieldName, TypeProtos.MinorType.VARCHAR);
+ makeColumn(builder, missingFieldName, MinorType.VARCHAR);
excelFieldNames.add(i, missingFieldName);
i++;
}
@@ -303,7 +347,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
// Remove leading and trailing whitespace
tempColumnName = tempColumnName.trim();
- makeColumn(builder, tempColumnName, TypeProtos.MinorType.VARCHAR);
+ makeColumn(builder, tempColumnName, MinorType.VARCHAR);
excelFieldNames.add(colPosition, tempColumnName);
break;
case FORMULA:
@@ -314,7 +358,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
// Remove leading and trailing whitespace
tempColumnName = tempColumnName.trim();
- makeColumn(builder, tempColumnName, TypeProtos.MinorType.FLOAT8);
+ makeColumn(builder, tempColumnName, MinorType.FLOAT8);
excelFieldNames.add(colPosition, tempColumnName);
break;
}
@@ -363,6 +407,22 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
+ /**
+ * This function is used to set the iterator to the first row of actual data. When a schema is provided,
+ * we can safely skip the header row, and start reading the first row of data.
+ */
+ private void setFirstDataRow() {
+ // Initialize
+ currentRow = rowIterator.next();
+ int rowNumber = readerConfig.headerRow > 0 ? sheet.getFirstRowNum() : 0;
+
+ // If the headerRow is greater than zero, advance the iterator to the first row of data
+ // This is unfortunately necessary since the streaming reader eliminated the getRow() method.
+ for (int i = 0; i <= rowNumber; i++) {
+ currentRow = rowIterator.next();
+ }
+ }
+
@Override
public boolean next() {
recordCount = 0;
@@ -431,6 +491,16 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
+ private boolean isMetadataField(String fieldName) {
+ try {
+ return (IMPLICIT_STRING_COLUMN.valueOf(fieldName).getFieldName().length() > 0 ||
+ IMPLICIT_TIMESTAMP_COLUMN.valueOf(fieldName).getFieldName().length() > 0 ||
+ IMPLICIT_LIST_COLUMN.valueOf(fieldName).getFieldName().length() > 0);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
private void populateMetadata(StreamingWorkbook streamingWorkbook) {
CoreProperties fileMetadata = streamingWorkbook.getCoreProperties();
@@ -521,6 +591,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
switch (type) {
// The Excel Reader only Supports Strings, Floats and Date/Times
case VARCHAR:
+ case INT:
case FLOAT8:
case DATE:
case TIMESTAMP:
@@ -541,10 +612,10 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
- private void addColumnToArray(TupleWriter rowWriter, String name, TypeProtos.MinorType type, boolean isMetadata) {
+ private void addColumnToArray(TupleWriter rowWriter, String name, MinorType type, boolean isMetadata) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
- ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, TypeProtos.DataMode.OPTIONAL);
+ ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, DataMode.OPTIONAL);
if (isMetadata) {
colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
}
@@ -559,10 +630,14 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
columnWriters.add(rowWriter.scalar(index));
if (readerConfig.allTextMode && type == MinorType.FLOAT8) {
cellWriterArray.add(new NumericStringWriter(columnWriters.get(index)));
+ } else if (readerConfig.allTextMode && type == MinorType.INT) {
+ cellWriterArray.add(new IntStringWriter(columnWriters.get(index)));
} else if (type == MinorType.VARCHAR) {
cellWriterArray.add(new StringCellWriter(columnWriters.get(index)));
- } else if (type == MinorType.FLOAT8) {
+ } else if (type == MinorType.FLOAT8 || type == MinorType.FLOAT4) {
cellWriterArray.add(new NumericCellWriter(columnWriters.get(index)));
+ } else if (type == MinorType.INT) {
+ cellWriterArray.add(new IntCellWriter(columnWriters.get(index)));
} else if (type == MinorType.TIMESTAMP) {
cellWriterArray.add(new TimestampCellWriter(columnWriters.get(index)));
}
@@ -710,6 +785,40 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
+
+ public static class IntStringWriter extends ExcelBatchReader.CellWriter {
+ IntStringWriter(ScalarWriter columnWriter) {
+ super(columnWriter);
+ }
+
+ @Override
+ public void load(Cell cell) {
+ if (cell == null) {
+ columnWriter.setNull();
+ } else {
+ String fieldValue = String.valueOf(cell.getNumericCellValue());
+ columnWriter.setString(fieldValue);
+ }
+ }
+ }
+
+ public static class IntCellWriter extends ExcelBatchReader.CellWriter {
+ IntCellWriter(ScalarWriter columnWriter) {
+ super(columnWriter);
+ }
+
+ @Override
+ public void load(Cell cell) {
+ if (cell == null) {
+ columnWriter.setNull();
+ } else {
+ int fieldNumValue = (int) cell.getNumericCellValue();
+ columnWriter.setInt(fieldNumValue);
+ }
+ }
+ }
+
+
public static class TimestampCellWriter extends ExcelBatchReader.CellWriter {
TimestampCellWriter(ScalarWriter columnWriter) {
super(columnWriter);
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index 7a37291..3f66450 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category;
import java.nio.file.Paths;
+import static org.apache.drill.test.QueryTestUtil.ConvertDateToLong;
import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
import static org.junit.Assert.assertEquals;
@@ -72,6 +73,113 @@ public class TestExcelFormat extends ClusterTest {
}
@Test
+ public void testStarWithProvidedSchema() throws Exception {
+ String sql = "SELECT * FROM table(dfs.`excel/schema_provisioning.xlsx` " +
+ "(schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)'" +
+ "))";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("col1", MinorType.INT)
+ .addNullable("col2", MinorType.FLOAT4)
+ .addNullable("col3", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, null, null)
+ .addRow(2, 3.0, null)
+ .addRow(4, 5.0, "six")
+ .addRow(7, 8.0, "nine")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testExplicitWithProvidedSchema() throws Exception {
+ String sql = "SELECT col1, col2, col3 FROM table(dfs.`excel/schema_provisioning.xlsx` " +
+ "(schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)'" +
+ "))";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("col1", MinorType.INT)
+ .addNullable("col2", MinorType.FLOAT4)
+ .addNullable("col3", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, null, null)
+ .addRow(2, 3.0, null)
+ .addRow(4, 5.0, "six")
+ .addRow(7, 8.0, "nine")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testProvidedSchemaWithMetadata() throws Exception {
+ String sql =
+ "SELECT col1, col2, col3, _category, _content_status, _content_type, _creator, _description, _identifier, _keywords, _last_modified_by_user, _revision, _subject, _title, " +
+ "_created," +
+ "_last_printed, _modified " +
+ "FROM table(dfs.`excel/schema_provisioning.xlsx` (schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)')) LIMIT 1";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("col1", MinorType.INT)
+ .addNullable("col2", MinorType.FLOAT4)
+ .addNullable("col3", MinorType.VARCHAR)
+ .addNullable("_category", MinorType.VARCHAR)
+ .addNullable("_content_status", MinorType.VARCHAR)
+ .addNullable("_content_type", MinorType.VARCHAR)
+ .addNullable("_creator", MinorType.VARCHAR)
+ .addNullable("_description", MinorType.VARCHAR)
+ .addNullable("_identifier", MinorType.VARCHAR)
+ .addNullable("_keywords", MinorType.VARCHAR)
+ .addNullable("_last_modified_by_user", MinorType.VARCHAR)
+ .addNullable("_revision", MinorType.VARCHAR)
+ .addNullable("_subject", MinorType.VARCHAR)
+ .addNullable("_title", MinorType.VARCHAR)
+ .addNullable("_created", MinorType.TIMESTAMP)
+ .addNullable("_last_printed", MinorType.TIMESTAMP)
+ .addNullable("_modified", MinorType.TIMESTAMP)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, null, null, null, null, null, "Microsoft Office User", null, null, null, "Microsoft Office User",
+ null, null, null, ConvertDateToLong("2021-10-27T11:35:00Z"), null, ConvertDateToLong("2021-10-28T13:25:51Z"))
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testProvidedSchemaWithNonDefaultSheet() throws Exception {
+ String sql = "SELECT col1, col2, col3 FROM table(dfs.`excel/schema_provisioning.xlsx` " +
+ "(type => 'excel', sheetName => 'SecondSheet', schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)'" +
+ "))";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("col1", MinorType.INT)
+ .addNullable("col2", MinorType.FLOAT4)
+ .addNullable("col3", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, 4.0, "Seven")
+ .addRow(2, 5.0, "Eight")
+ .addRow(3, 6.0, "Nine")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
public void testExplicitAllQuery() throws RpcException {
String sql = "SELECT id, first_name, last_name, email, gender, birthdate, balance, order_count, average_order FROM cp.`excel/test_data.xlsx`";
diff --git a/contrib/format-excel/src/test/resources/excel/schema_provisioning.xlsx b/contrib/format-excel/src/test/resources/excel/schema_provisioning.xlsx
new file mode 100644
index 0000000..55eb884
Binary files /dev/null and b/contrib/format-excel/src/test/resources/excel/schema_provisioning.xlsx differ
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
index 86ec9ca..2e75d56 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.BindException;
import java.net.ServerSocket;
+import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
@@ -352,4 +353,16 @@ public class QueryTestUtil {
IOUtils.copyBytes(inputStream, outputStream, fs.getConf(), false);
}
}
+
+ /**
+ * When writing Drill unit tests, Drill will output strings for dates. However,
+ * these strings must be converted into timestamps (long) for use in unit tests. This method
+ * provides a convenient way to do so.
+ * @param dateString An input date string from Drill output
+ * @return The datestring in epoch/millis.
+ */
+ public static long ConvertDateToLong(String dateString) {
+ Instant instant = Instant.parse(dateString);
+ return instant.toEpochMilli();
+ }
}