You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/11/01 01:57:44 UTC

[drill] branch master updated: DRILL-8022: Add Provided Schema Support for Excel Reader

This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 128bc3d  DRILL-8022: Add Provided Schema Support for Excel Reader
128bc3d is described below

commit 128bc3dfd3a716c32fa9ab57cf965fa9c470322b
Author: Charles Givre <cg...@apache.org>
AuthorDate: Sun Oct 31 19:06:22 2021 -0400

    DRILL-8022: Add Provided Schema Support for Excel Reader
---
 contrib/format-excel/pom.xml                       |   2 +-
 .../drill/exec/store/excel/ExcelBatchReader.java   | 127 +++++++++++++++++++--
 .../drill/exec/store/excel/TestExcelFormat.java    | 108 ++++++++++++++++++
 .../test/resources/excel/schema_provisioning.xlsx  | Bin 0 -> 9748 bytes
 .../java/org/apache/drill/test/QueryTestUtil.java  |  13 +++
 5 files changed, 240 insertions(+), 10 deletions(-)

diff --git a/contrib/format-excel/pom.xml b/contrib/format-excel/pom.xml
index 5e831bb..91d225e 100644
--- a/contrib/format-excel/pom.xml
+++ b/contrib/format-excel/pom.xml
@@ -67,7 +67,7 @@
     <dependency>
       <groupId>com.github.pjfanning</groupId>
       <artifactId>excel-streaming-reader</artifactId>
-      <version>3.1.2</version>
+      <version>3.1.6</version>
     </dependency>
   </dependencies>
   <build>
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index d43cdfd..132da1f 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -22,7 +22,6 @@ import com.github.pjfanning.xlsx.StreamingReader;
 import com.github.pjfanning.xlsx.impl.StreamingWorkbook;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
@@ -30,9 +29,11 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchem
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.hadoop.mapred.FileSplit;
@@ -193,7 +194,25 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     ResultSetLoader loader = negotiator.build();
     rowWriter = loader.writer();
     openFile(negotiator);
-    defineSchema();
+
+    if (negotiator.hasProvidedSchema()) {
+      TupleMetadata providedSchema = negotiator.providedSchema();
+      logger.debug("Found inline schema");
+
+      // Add Implicit columns to schema
+      SchemaBuilder builderForProvidedSchema = new SchemaBuilder();
+      builderForProvidedSchema.addAll(providedSchema);
+      TupleMetadata finalSchema = builderForProvidedSchema.build();
+      buildColumnWritersFromProvidedSchema(finalSchema);
+
+      // Add schema to file negotiator
+      logger.debug("Metadata added to provided schema.");
+      addMetadataToSchema(builderForProvidedSchema);
+      // Build column writer array
+      negotiator.tableSchema(finalSchema, true);
+    } else {
+      defineSchema(negotiator);
+    }
     return true;
   }
 
@@ -231,9 +250,34 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
   /**
    * This function defines the schema from the header row.
    */
-  private void defineSchema() {
+  private void defineSchema(FileSchemaNegotiator negotiator) {
     SchemaBuilder builder = new SchemaBuilder();
     getColumnHeaders(builder);
+    negotiator.tableSchema(builder.buildSchema(), false);
+  }
+
+  private void buildColumnWritersFromProvidedSchema(TupleMetadata finalSchema) {
+    // Case for empty sheet
+    if (sheet.getLastRowNum() == 0) {
+      return;
+    }
+
+    columnWriters = new ArrayList<>();
+    metadataColumnWriters = new ArrayList<>();
+    cellWriterArray = new ArrayList<>();
+    rowIterator = sheet.iterator();
+
+    // Get the number of columns.
+    // This method also advances the row reader to the location of the first row of data
+    setFirstDataRow();
+    totalColumnCount = finalSchema.size();
+    firstLine = false;
+
+    // Populate column writer array
+    for(MaterializedField field : finalSchema.toFieldList()) {
+      addColumnToArray(rowWriter, field.getName(), field.getType().getMinorType(), isMetadataField(field.getName()));
+    }
+    addMetadataWriters();
   }
 
   private void getColumnHeaders(SchemaBuilder builder) {
@@ -262,7 +306,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
       for (Cell c : currentRow) {
         missingFieldName = MISSING_FIELD_NAME_HEADER + (i + 1);
-        makeColumn(builder, missingFieldName, TypeProtos.MinorType.VARCHAR);
+        makeColumn(builder, missingFieldName, MinorType.VARCHAR);
         excelFieldNames.add(i, missingFieldName);
         i++;
       }
@@ -303,7 +347,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
             // Remove leading and trailing whitespace
             tempColumnName = tempColumnName.trim();
-            makeColumn(builder, tempColumnName, TypeProtos.MinorType.VARCHAR);
+            makeColumn(builder, tempColumnName, MinorType.VARCHAR);
             excelFieldNames.add(colPosition, tempColumnName);
             break;
           case FORMULA:
@@ -314,7 +358,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
             // Remove leading and trailing whitespace
             tempColumnName = tempColumnName.trim();
-            makeColumn(builder, tempColumnName, TypeProtos.MinorType.FLOAT8);
+            makeColumn(builder, tempColumnName, MinorType.FLOAT8);
             excelFieldNames.add(colPosition, tempColumnName);
             break;
         }
@@ -363,6 +407,22 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
+  /**
+   * This function is used to set the iterator to the first row of actual data.  When a schema is provided,
+   * we can safely skip the header row, and start reading the first row of data.
+   */
+  private void setFirstDataRow() {
+    // Initialize
+    currentRow = rowIterator.next();
+    int rowNumber = readerConfig.headerRow > 0 ? sheet.getFirstRowNum() : 0;
+
+    // If the headerRow is greater than zero, advance the iterator to the first row of data
+    // This is unfortunately necessary since the streaming reader eliminated the getRow() method.
+    for (int i = 0; i <= rowNumber; i++) {
+      currentRow = rowIterator.next();
+    }
+  }
+
   @Override
   public boolean next() {
     recordCount = 0;
@@ -431,6 +491,16 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
+  private boolean isMetadataField(String fieldName) {
+    try {
+      return (IMPLICIT_STRING_COLUMN.valueOf(fieldName).getFieldName().length() > 0 ||
+        IMPLICIT_TIMESTAMP_COLUMN.valueOf(fieldName).getFieldName().length() > 0 ||
+        IMPLICIT_LIST_COLUMN.valueOf(fieldName).getFieldName().length() > 0);
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
   private void populateMetadata(StreamingWorkbook streamingWorkbook) {
 
     CoreProperties fileMetadata = streamingWorkbook.getCoreProperties();
@@ -521,6 +591,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     switch (type) {
       // The Excel Reader only Supports Strings, Floats and Date/Times
       case VARCHAR:
+      case INT:
       case FLOAT8:
       case DATE:
       case TIMESTAMP:
@@ -541,10 +612,10 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  private void addColumnToArray(TupleWriter rowWriter, String name, TypeProtos.MinorType type, boolean isMetadata) {
+  private void addColumnToArray(TupleWriter rowWriter, String name, MinorType type, boolean isMetadata) {
     int index = rowWriter.tupleSchema().index(name);
     if (index == -1) {
-      ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, TypeProtos.DataMode.OPTIONAL);
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, DataMode.OPTIONAL);
       if (isMetadata) {
         colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
       }
@@ -559,10 +630,14 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
       columnWriters.add(rowWriter.scalar(index));
       if (readerConfig.allTextMode && type == MinorType.FLOAT8) {
         cellWriterArray.add(new NumericStringWriter(columnWriters.get(index)));
+      } else if (readerConfig.allTextMode && type == MinorType.INT) {
+        cellWriterArray.add(new IntStringWriter(columnWriters.get(index)));
       } else if (type == MinorType.VARCHAR) {
         cellWriterArray.add(new StringCellWriter(columnWriters.get(index)));
-      } else if (type == MinorType.FLOAT8) {
+      } else if (type == MinorType.FLOAT8 || type == MinorType.FLOAT4) {
         cellWriterArray.add(new NumericCellWriter(columnWriters.get(index)));
+      }  else if (type == MinorType.INT) {
+        cellWriterArray.add(new IntCellWriter(columnWriters.get(index)));
       } else if (type == MinorType.TIMESTAMP) {
         cellWriterArray.add(new TimestampCellWriter(columnWriters.get(index)));
       }
@@ -710,6 +785,40 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
+
+  public static class IntStringWriter extends ExcelBatchReader.CellWriter {
+    IntStringWriter(ScalarWriter columnWriter) {
+      super(columnWriter);
+    }
+
+    @Override
+    public void load(Cell cell) {
+      if (cell == null) {
+        columnWriter.setNull();
+      } else {
+        String fieldValue = String.valueOf(cell.getNumericCellValue());
+        columnWriter.setString(fieldValue);
+      }
+    }
+  }
+
+  public static class IntCellWriter extends ExcelBatchReader.CellWriter {
+    IntCellWriter(ScalarWriter columnWriter) {
+      super(columnWriter);
+    }
+
+    @Override
+    public void load(Cell cell) {
+      if (cell == null) {
+        columnWriter.setNull();
+      } else {
+        int fieldNumValue = (int) cell.getNumericCellValue();
+        columnWriter.setInt(fieldNumValue);
+      }
+    }
+  }
+
+
   public static class TimestampCellWriter extends ExcelBatchReader.CellWriter {
     TimestampCellWriter(ScalarWriter columnWriter) {
       super(columnWriter);
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index 7a37291..3f66450 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category;
 
 import java.nio.file.Paths;
 
+import static org.apache.drill.test.QueryTestUtil.ConvertDateToLong;
 import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
@@ -72,6 +73,113 @@ public class TestExcelFormat extends ClusterTest {
   }
 
   @Test
+  public void testStarWithProvidedSchema() throws Exception {
+    String sql = "SELECT * FROM table(dfs.`excel/schema_provisioning.xlsx` " +
+      "(schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)'" +
+      "))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("col1", MinorType.INT)
+      .addNullable("col2", MinorType.FLOAT4)
+      .addNullable("col3", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, null, null)
+      .addRow(2, 3.0, null)
+      .addRow(4, 5.0, "six")
+      .addRow(7, 8.0, "nine")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testExplicitWithProvidedSchema() throws Exception {
+    String sql = "SELECT col1, col2, col3 FROM table(dfs.`excel/schema_provisioning.xlsx` " +
+      "(schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)'" +
+      "))";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("col1", MinorType.INT)
+      .addNullable("col2", MinorType.FLOAT4)
+      .addNullable("col3", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, null, null)
+      .addRow(2, 3.0, null)
+      .addRow(4, 5.0, "six")
+      .addRow(7, 8.0, "nine")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testProvidedSchemaWithMetadata() throws Exception {
+    String sql =
+      "SELECT col1, col2, col3, _category, _content_status, _content_type, _creator, _description, _identifier, _keywords, _last_modified_by_user, _revision, _subject, _title, " +
+        "_created," +
+        "_last_printed, _modified " +
+        "FROM table(dfs.`excel/schema_provisioning.xlsx` (schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)')) LIMIT 1";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("col1", MinorType.INT)
+      .addNullable("col2", MinorType.FLOAT4)
+      .addNullable("col3", MinorType.VARCHAR)
+      .addNullable("_category", MinorType.VARCHAR)
+      .addNullable("_content_status", MinorType.VARCHAR)
+      .addNullable("_content_type", MinorType.VARCHAR)
+      .addNullable("_creator", MinorType.VARCHAR)
+      .addNullable("_description", MinorType.VARCHAR)
+      .addNullable("_identifier", MinorType.VARCHAR)
+      .addNullable("_keywords", MinorType.VARCHAR)
+      .addNullable("_last_modified_by_user", MinorType.VARCHAR)
+      .addNullable("_revision", MinorType.VARCHAR)
+      .addNullable("_subject", MinorType.VARCHAR)
+      .addNullable("_title", MinorType.VARCHAR)
+      .addNullable("_created", MinorType.TIMESTAMP)
+      .addNullable("_last_printed", MinorType.TIMESTAMP)
+      .addNullable("_modified", MinorType.TIMESTAMP)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, null, null, null, null, null, "Microsoft Office User", null, null, null, "Microsoft Office User",
+        null, null, null, ConvertDateToLong("2021-10-27T11:35:00Z"), null, ConvertDateToLong("2021-10-28T13:25:51Z"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testProvidedSchemaWithNonDefaultSheet() throws Exception {
+    String sql = "SELECT col1, col2, col3 FROM table(dfs.`excel/schema_provisioning.xlsx` " +
+      "(type => 'excel', sheetName => 'SecondSheet', schema => 'inline=(`col1` INTEGER, `col2` FLOAT, `col3` VARCHAR)'" +
+      "))";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("col1", MinorType.INT)
+      .addNullable("col2", MinorType.FLOAT4)
+      .addNullable("col3", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1, 4.0, "Seven")
+      .addRow(2, 5.0, "Eight")
+      .addRow(3, 6.0, "Nine")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
   public void testExplicitAllQuery() throws RpcException {
     String sql = "SELECT id, first_name, last_name, email, gender, birthdate, balance, order_count, average_order FROM cp.`excel/test_data.xlsx`";
 
diff --git a/contrib/format-excel/src/test/resources/excel/schema_provisioning.xlsx b/contrib/format-excel/src/test/resources/excel/schema_provisioning.xlsx
new file mode 100644
index 0000000..55eb884
Binary files /dev/null and b/contrib/format-excel/src/test/resources/excel/schema_provisioning.xlsx differ
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
index 86ec9ca..2e75d56 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.BindException;
 import java.net.ServerSocket;
+import java.time.Instant;
 import java.util.List;
 import java.util.Properties;
 import java.util.regex.Matcher;
@@ -352,4 +353,16 @@ public class QueryTestUtil {
       IOUtils.copyBytes(inputStream, outputStream, fs.getConf(), false);
     }
   }
+
+  /**
+   * When writing Drill unit tests, Drill will output strings for dates.  However,
+   * these strings must be converted into timestamps (long) for use in unit tests.  This method
+   * provides a convenient way to do so.
+   * @param dateString An input date string from Drill output
+   * @return The datestring in epoch/millis.
+   */
+  public static long ConvertDateToLong(String dateString) {
+    Instant instant = Instant.parse(dateString);
+    return instant.toEpochMilli();
+  }
 }