You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/04/13 05:41:11 UTC

[GitHub] [drill] paul-rogers commented on a diff in pull request #2515: DRILL-8188: Convert HDF5 format to EVF2

paul-rogers commented on code in PR #2515:
URL: https://github.com/apache/drill/pull/2515#discussion_r849083600


##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -171,107 +168,109 @@ public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig)
     }
   }
 
-  public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
-    this.readerConfig = readerConfig;
-    this.maxRecords = maxRecords;
+  public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+    errorContext = negotiator.parentErrorContext();
+    file = negotiator.file();
+    readerConfig = config;
     dataWriters = new ArrayList<>();
-    this.showMetadataPreview = readerConfig.formatConfig.showPreview();
-  }
+    showMetadataPreview = readerConfig.formatConfig.showPreview();
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
-    errorContext = negotiator.parentErrorContext();
     // Since the HDF file reader uses a stream to actually read the file, the file name from the
     // module is incorrect.
-    fileName = split.getPath().getName();
-    try {
-      openFile(negotiator);
-    } catch (IOException e) {
-      throw UserException
-        .dataReadError(e)
-        .addContext("Failed to close input file: %s", split.getPath())
-        .addContext(errorContext)
-        .build(logger);
-    }
+    fileName = file.split().getPath().getName();
 
-    ResultSetLoader loader;
-    if (readerConfig.defaultPath == null) {
-      // Get file metadata
-      List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
-      metadataIterator = metadata.iterator();
-
-      // Schema for Metadata query
-      SchemaBuilder builder = new SchemaBuilder()
-        .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
-        .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
-        .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
-        .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
-        .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
-      negotiator.tableSchema(builder.buildSchema(), false);
-
-      loader = negotiator.build();
-      dimensions = new int[0];
-      rowWriter = loader.writer();
-
-    } else {
-      // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
-      // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
-      Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
-      dimensions = dataSet.getDimensions();
-
-      loader = negotiator.build();
-      rowWriter = loader.writer();
-      writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
-          negotiator.parentErrorContext());
-      if (dimensions.length <= 1) {
-        buildSchemaFor1DimensionalDataset(dataSet);
-      } else if (dimensions.length == 2) {
-        buildSchemaFor2DimensionalDataset(dataSet);
-      } else {
-        // Case for datasets of greater than 2D
-        // These are automatically flattened
-        buildSchemaFor2DimensionalDataset(dataSet);
+    { // Opens an HDF5 file

Review Comment:
   Is the nested block necessary? It is handy when we want to reuse variable names (as sometimes occurs in tests), but is it needed here?



##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -410,8 +390,22 @@ public boolean next() {
     return true;
   }
 
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(hdfFile, reader);
+    /*
+     * The current implementation of the HDF5 reader creates a temp file which needs to be removed
+     * when the batch reader is closed. A possible future functionality might be to used to.

Review Comment:
   seems an incomplete comment: "used to" do what?



##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -410,8 +390,22 @@ public boolean next() {
     return true;
   }
 
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(hdfFile, reader);
+    /*
+     * The current implementation of the HDF5 reader creates a temp file which needs to be removed
+     * when the batch reader is closed. A possible future functionality might be to used to.
+     */
+    boolean result = hdfFile.getFile().delete();

Review Comment:
   There was a comment above about reading from a byte array. We can't read from the original file? Are we reading from a byte array or a temp file?
   
   If a temp file, where is the temp file located? In the system temp dir, or in Drill's configured temp dir?



##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -171,107 +168,109 @@ public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig)
     }
   }
 
-  public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
-    this.readerConfig = readerConfig;
-    this.maxRecords = maxRecords;
+  public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+    errorContext = negotiator.parentErrorContext();
+    file = negotiator.file();
+    readerConfig = config;
     dataWriters = new ArrayList<>();
-    this.showMetadataPreview = readerConfig.formatConfig.showPreview();
-  }
+    showMetadataPreview = readerConfig.formatConfig.showPreview();
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
-    errorContext = negotiator.parentErrorContext();
     // Since the HDF file reader uses a stream to actually read the file, the file name from the
     // module is incorrect.
-    fileName = split.getPath().getName();
-    try {
-      openFile(negotiator);
-    } catch (IOException e) {
-      throw UserException
-        .dataReadError(e)
-        .addContext("Failed to close input file: %s", split.getPath())
-        .addContext(errorContext)
-        .build(logger);
-    }
+    fileName = file.split().getPath().getName();
 
-    ResultSetLoader loader;
-    if (readerConfig.defaultPath == null) {
-      // Get file metadata
-      List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
-      metadataIterator = metadata.iterator();
-
-      // Schema for Metadata query
-      SchemaBuilder builder = new SchemaBuilder()
-        .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
-        .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
-        .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
-        .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
-        .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
-      negotiator.tableSchema(builder.buildSchema(), false);
-
-      loader = negotiator.build();
-      dimensions = new int[0];
-      rowWriter = loader.writer();
-
-    } else {
-      // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
-      // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
-      Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
-      dimensions = dataSet.getDimensions();
-
-      loader = negotiator.build();
-      rowWriter = loader.writer();
-      writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
-          negotiator.parentErrorContext());
-      if (dimensions.length <= 1) {
-        buildSchemaFor1DimensionalDataset(dataSet);
-      } else if (dimensions.length == 2) {
-        buildSchemaFor2DimensionalDataset(dataSet);
-      } else {
-        // Case for datasets of greater than 2D
-        // These are automatically flattened
-        buildSchemaFor2DimensionalDataset(dataSet);
+    { // Opens an HDF5 file
+      InputStream in = null;
+      try {

Review Comment:
   Can we use try-with-resources here?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java:
##########
@@ -189,7 +189,7 @@ private void insertColumn(ColumnMetadata col) {
     switch (mode) {
       case FIRST_READER_SCHEMA:
       case READER_SCHEMA:
-        if (schema.projectionType() != ProjectionType.ALL) {
+        if (schema.projectionType() != ProjectionType.ALL && !col.isArray()) {

Review Comment:
   Why is an array column special here? I think this is trying to say that, if the array is not projected, it should not have been created. There are dummy structures used instead. This fix suggests that there is a bug somewhere other than here.



##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -171,107 +168,109 @@ public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig)
     }
   }
 
-  public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
-    this.readerConfig = readerConfig;
-    this.maxRecords = maxRecords;
+  public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+    errorContext = negotiator.parentErrorContext();
+    file = negotiator.file();
+    readerConfig = config;
     dataWriters = new ArrayList<>();
-    this.showMetadataPreview = readerConfig.formatConfig.showPreview();
-  }
+    showMetadataPreview = readerConfig.formatConfig.showPreview();
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
-    errorContext = negotiator.parentErrorContext();
     // Since the HDF file reader uses a stream to actually read the file, the file name from the
     // module is incorrect.
-    fileName = split.getPath().getName();
-    try {
-      openFile(negotiator);
-    } catch (IOException e) {
-      throw UserException
-        .dataReadError(e)
-        .addContext("Failed to close input file: %s", split.getPath())
-        .addContext(errorContext)
-        .build(logger);
-    }
+    fileName = file.split().getPath().getName();
 
-    ResultSetLoader loader;
-    if (readerConfig.defaultPath == null) {
-      // Get file metadata
-      List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
-      metadataIterator = metadata.iterator();
-
-      // Schema for Metadata query
-      SchemaBuilder builder = new SchemaBuilder()
-        .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
-        .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
-        .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
-        .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
-        .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
-      negotiator.tableSchema(builder.buildSchema(), false);
-
-      loader = negotiator.build();
-      dimensions = new int[0];
-      rowWriter = loader.writer();
-
-    } else {
-      // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
-      // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
-      Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
-      dimensions = dataSet.getDimensions();
-
-      loader = negotiator.build();
-      rowWriter = loader.writer();
-      writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
-          negotiator.parentErrorContext());
-      if (dimensions.length <= 1) {
-        buildSchemaFor1DimensionalDataset(dataSet);
-      } else if (dimensions.length == 2) {
-        buildSchemaFor2DimensionalDataset(dataSet);
-      } else {
-        // Case for datasets of greater than 2D
-        // These are automatically flattened
-        buildSchemaFor2DimensionalDataset(dataSet);
+    { // Opens an HDF5 file
+      InputStream in = null;
+      try {
+        /*
+         * As a possible future improvement, the jhdf reader has the ability to read hdf5 files from
+         * a byte array or byte buffer. This implementation is better in that it does not require creating
+         * a temporary file which must be deleted later.  However, it could result in memory issues in the
+         * event of large files.
+         */
+        in = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+        hdfFile = HdfFile.fromInputStream(in);
+        reader = new BufferedReader(new InputStreamReader(in));

Review Comment:
   How is the same input stream being used by two consumers?



##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -171,107 +168,109 @@ public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig)
     }
   }
 
-  public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
-    this.readerConfig = readerConfig;
-    this.maxRecords = maxRecords;
+  public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+    errorContext = negotiator.parentErrorContext();
+    file = negotiator.file();
+    readerConfig = config;
     dataWriters = new ArrayList<>();
-    this.showMetadataPreview = readerConfig.formatConfig.showPreview();
-  }
+    showMetadataPreview = readerConfig.formatConfig.showPreview();
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
-    errorContext = negotiator.parentErrorContext();
     // Since the HDF file reader uses a stream to actually read the file, the file name from the
     // module is incorrect.
-    fileName = split.getPath().getName();
-    try {
-      openFile(negotiator);
-    } catch (IOException e) {
-      throw UserException
-        .dataReadError(e)
-        .addContext("Failed to close input file: %s", split.getPath())
-        .addContext(errorContext)
-        .build(logger);
-    }
+    fileName = file.split().getPath().getName();
 
-    ResultSetLoader loader;
-    if (readerConfig.defaultPath == null) {
-      // Get file metadata
-      List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
-      metadataIterator = metadata.iterator();
-
-      // Schema for Metadata query
-      SchemaBuilder builder = new SchemaBuilder()
-        .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
-        .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
-        .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
-        .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
-        .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
-        .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
-      negotiator.tableSchema(builder.buildSchema(), false);
-
-      loader = negotiator.build();
-      dimensions = new int[0];
-      rowWriter = loader.writer();
-
-    } else {
-      // This is the case when the default path is specified. Since the user is explicitly asking for a dataset
-      // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns
-      Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
-      dimensions = dataSet.getDimensions();
-
-      loader = negotiator.build();
-      rowWriter = loader.writer();
-      writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
-          negotiator.parentErrorContext());
-      if (dimensions.length <= 1) {
-        buildSchemaFor1DimensionalDataset(dataSet);
-      } else if (dimensions.length == 2) {
-        buildSchemaFor2DimensionalDataset(dataSet);
-      } else {
-        // Case for datasets of greater than 2D
-        // These are automatically flattened
-        buildSchemaFor2DimensionalDataset(dataSet);
+    { // Opens an HDF5 file
+      InputStream in = null;
+      try {
+        /*
+         * As a possible future improvement, the jhdf reader has the ability to read hdf5 files from
+         * a byte array or byte buffer. This implementation is better in that it does not require creating
+         * a temporary file which must be deleted later.  However, it could result in memory issues in the
+         * event of large files.
+         */
+        in = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+        hdfFile = HdfFile.fromInputStream(in);
+        reader = new BufferedReader(new InputStreamReader(in));
+      } catch (IOException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to open input file: %s", file.split().getPath())
+          .addContext(errorContext)
+          .build(logger);
+      } finally {
+        AutoCloseables.closeSilently(in);
       }
     }
-    if (readerConfig.defaultPath == null) {
-      pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
-      dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
-      fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
-      dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
-      linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
-      elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
-      datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
-      dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
-    }
-    return true;
-  }
-
-  /**
-   * This function is called when the default path is set and the data set is a single dimension.
-   * This function will create an array of one dataWriter of the
-   * correct datatype
-   * @param dataset The HDF5 dataset
-   */
-  private void buildSchemaFor1DimensionalDataset(Dataset dataset) {
-    MinorType currentDataType = HDF5Utils.getDataType(dataset.getDataType());
 
-    // Case for null or unknown data types:
-    if (currentDataType == null) {
-      logger.warn("Couldn't add {}", dataset.getJavaType().getName());
-      return;
+    { // Build the schema and initial the writer
+      ResultSetLoader loader;
+      if (readerConfig.defaultPath == null) {
+        // Get file metadata
+        List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new ArrayList<>());
+        metadataIterator = metadata.iterator();
+
+        // Schema for Metadata query
+        SchemaBuilder builder = new SchemaBuilder()
+          .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+          .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+          .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+          .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+          .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+          .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
+          .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
+          .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
+
+        negotiator.tableSchema(builder.buildSchema(), false);
+
+        loader = negotiator.build();
+        dimensions = new int[0];

Review Comment:
   `dimensions = null`?



##########
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java:
##########
@@ -327,4 +375,50 @@ public void testStrictProvidedSchemaWithWildcardAndSpecialCols() {
     assertFalse(scan.next());
     scanFixture.close();
   }
+
+  @Test
+  public void testProvidedSchemaWithListArray() {
+    TupleMetadata providedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+
+    BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder(fixture);
+    builder.addReader(negotiator -> new MockSimpleReader(negotiator, true));
+    builder.builder.providedSchema(providedSchema);
+    builder.setProjection(new String[] { "a", "b", "c" });
+    builder.builder.nullType(Types.optional(MinorType.VARCHAR));
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .addRepeatedList("int_list")
+          .addArray(MinorType.INT)
+          .resumeSchema()
+        .addRepeatedList("long_list")

Review Comment:
   Thanks for testing this. The two messiest vector types in Drill are the repeated Map and repeated list. Repeated list has many, many problems. It isn't even well defined in SQL since it's types can change from row to row.
   
   I wonder, why do we need a repeated list for this reader? Because we want a 2D array? What would a Drill user do with a 2D array?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java:
##########
@@ -173,7 +173,7 @@ public ColumnHandle insert(int posn, ColumnMetadata col) {
   }
 
   public ColumnHandle insert(ColumnMetadata col) {
-    return insert(insertPoint++, col);
+    return insert(insertPoint == -1 ? size() : insertPoint++, col);

Review Comment:
   What was the bug here? I'm wondering if there is something else broken, and the above is a work around for that other bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org