You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/04/04 16:21:33 UTC

[drill] branch master updated: DRILL-8174: Convert Avro format to EVF2 (#2511)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e1ab3ad4a DRILL-8174: Convert Avro format to EVF2 (#2511)
8e1ab3ad4a is described below

commit 8e1ab3ad4a9e26de86f029a16f56608dab5f1876
Author: luoc <lu...@apache.org>
AuthorDate: Tue Apr 5 00:21:25 2022 +0800

    DRILL-8174: Convert Avro format to EVF2 (#2511)
    
    * DRILL-8174: Convert Avro format to EVF2
    
    * Addressed review comments
---
 .../impl/scan/v3/schema/DynamicSchemaFilter.java   |   2 +-
 .../drill/exec/store/avro/AvroBatchReader.java     | 162 ++++++++++-----------
 .../drill/exec/store/avro/AvroFormatPlugin.java    |  32 ++--
 3 files changed, 98 insertions(+), 98 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java
index 513a1020f3..687176534f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java
@@ -139,7 +139,7 @@ public abstract class DynamicSchemaFilter implements ProjectionFilter {
   protected ProjResult fromSchema(ColumnMetadata schemaCol,
       ColumnMetadata probeCol) {
     SchemaUtils.verifyConsistency(schemaCol, probeCol, source, errorContext);
-    if (schemaCol.isMap()) {
+    if (schemaCol.isMap() || schemaCol.isDict()) {
       return new ProjResult(true, schemaCol, mapProjection(schemaCol));
     } else {
       return new ProjResult(true, schemaCol);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
index ca587aed50..2075b7422e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -17,19 +17,27 @@
  */
 package org.apache.drill.exec.store.avro;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.ColumnConverter;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,59 +46,52 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+public class AvroBatchReader implements ManagedReader {
   private static final Logger logger = LoggerFactory.getLogger(AvroBatchReader.class);
 
-  private Path filePath;
-  private long endPosition;
-  private DataFileReader<GenericRecord> reader;
-  private ResultSetLoader loader;
-  private ColumnConverter converter;
+  private final Path filePath;
+  private final long endPosition;
+  private final DataFileReader<GenericRecord> reader;
+  private final RowSetLoader loader;
+  private final ColumnConverter converter;
+  private final CustomErrorContext errorContext;
   // re-use container instance
   private GenericRecord record;
-  private final int maxRecords;
-
-  public AvroBatchReader(int maxRecords) {
-    this.maxRecords = maxRecords;
-  }
 
-  @Override
-  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
-    FileSplit split = negotiator.split();
-    filePath = split.getPath();
-
-    // Avro files are splittable, define reading start / end positions
-    long startPosition = split.getStart();
-    endPosition = startPosition + split.getLength();
+  public AvroBatchReader(AvroFormatConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+    errorContext = negotiator.parentErrorContext();
+    FileDescrip file = negotiator.file();
+    filePath = file.split().getPath();
 
+    // Avro files are splittable, define reading start / end positions.
+    long startPosition = file.split().getStart();
+    endPosition = startPosition + file.split().getLength();
     logger.debug("Processing Avro file: {}, start position: {}, end position: {}",
-      filePath, startPosition, endPosition);
-
-    reader = prepareReader(split, negotiator.fileSystem(),
-      negotiator.userName(), negotiator.context().getFragmentContext().getQueryUserName());
+        filePath, startPosition, endPosition);
 
+    reader = prepareReader(file.split(), file.fileSystem(),
+        negotiator.userName(), negotiator.context().getFragmentContext().getQueryUserName());
     logger.debug("Avro file schema: {}", reader.getSchema());
+
     TupleMetadata readerSchema = AvroSchemaUtil.convert(reader.getSchema());
     logger.debug("Avro file converted schema: {}", readerSchema);
+
     TupleMetadata providedSchema = negotiator.providedSchema();
     TupleMetadata tableSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
     logger.debug("Avro file table schema: {}", tableSchema);
+
     negotiator.tableSchema(tableSchema, true);
-    loader = negotiator.build();
-    AvroColumnConverterFactory factory = new AvroColumnConverterFactory(providedSchema);
-    converter = factory.getRootConverter(providedSchema, readerSchema, loader.writer());
+    ResultSetLoader setLoader = negotiator.build();
+    loader = setLoader.writer();
 
-    return true;
+    AvroColumnConverterFactory factory = new AvroColumnConverterFactory(providedSchema);
+    converter = factory.getRootConverter(providedSchema, readerSchema, loader);
   }
 
   @Override
   public boolean next() {
-    RowSetLoader rowWriter = loader.writer();
-    while (!rowWriter.isFull()) {
-      if (!nextLine(rowWriter)) {
+    while (!loader.isFull()) {
+      if (!nextLine(loader)) {
         return false;
       }
     }
@@ -99,17 +100,7 @@ public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSche
 
   @Override
   public void close() {
-    if (reader == null) {
-      return;
-    }
-
-    try {
-      reader.close();
-    } catch (IOException e) {
-      logger.warn("Error closing Avro reader: {}", e.getMessage(), e);
-    } finally {
-      reader = null;
-    }
+    AutoCloseables.closeSilently(reader);
   }
 
   @Override
@@ -122,60 +113,38 @@ public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSche
     } catch (IOException e) {
       logger.trace("Unable to obtain Avro reader position: {}", e.getMessage(), e);
     }
-    return "AvroBatchReader[File=" + filePath
-      + ", Position=" + currentPosition
-      + "]";
+    return new PlanStringBuilder(this)
+        .unquotedField("File", filePath.toString())
+        .unquotedField("Position", String.valueOf(currentPosition))
+        .toString();
   }
 
   /**
-   * Initialized Avro data reader based on given file system and file path.
-   * Moves reader to the sync point from where to start reading the data.
-   *
-   * @param fileSplit file split
-   * @param fs file system
-   * @param opUserName name of the user whom to impersonate while reading the data
-   * @param queryUserName name of the user who issues the query
-   * @return Avro file reader
+   * Process one row of records.
+   * @param rowWriter
+   * @return true true if one row is processed, false the EOF is reached.
    */
-  private DataFileReader<GenericRecord> prepareReader(FileSplit fileSplit, FileSystem fs, String opUserName, String queryUserName) {
-    try {
-      UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
-      DataFileReader<GenericRecord> reader = ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericRecord>>) () ->
-        new DataFileReader<>(new FsInput(fileSplit.getPath(), fs.getConf()), new GenericDatumReader<GenericRecord>()));
-
-      // move to sync point from where to read the file
-      reader.sync(fileSplit.getStart());
-      return reader;
-    } catch (IOException | InterruptedException e) {
-      throw UserException.dataReadError(e)
-        .message("Error preparing Avro reader")
-        .addContext(String.format("Reader: %s", this))
-        .build(logger);
-    }
-  }
-
   private boolean nextLine(RowSetLoader rowWriter) {
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
-
     try {
       if (!reader.hasNext() || reader.pastSync(endPosition)) {
         return false;
       }
       record = reader.next(record);
     } catch (IOException e) {
-      throw UserException.dataReadError(e)
-        .addContext(String.format("Reader %s", this))
+      throw UserException
+        .dataReadError(e)
+        .addContext(e.getMessage())
+        .addContext(errorContext)
         .build(logger);
     }
 
     Schema schema = record.getSchema();
 
     if (Schema.Type.RECORD != schema.getType()) {
-      throw UserException.dataReadError()
+      throw UserException
+        .dataReadError()
         .message("Root object must be record type. Found: %s", schema.getType())
-        .addContext(String.format("Reader %s", this))
+        .addContext(errorContext)
         .build(logger);
     }
 
@@ -185,4 +154,33 @@ public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSche
 
     return true;
   }
+
+  /**
+   * Initialized Avro data reader based on given file system and file path.
+   * Moves reader to the sync point from where to start reading the data.
+   *
+   * @param fileSplit A section of an input file.
+   * @param fs A fairly generic filesystem.
+   * @param opUserName name of the user whom to impersonate while reading the data.
+   * @param queryUserName name of the user who issues the query.
+   * @return Avro file reader
+   */
+  private DataFileReader<GenericRecord> prepareReader(FileSplit fileSplit, FileSystem fs, String opUserName, String queryUserName) {
+    try {
+      UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      DataFileReader<GenericRecord> reader = ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericRecord>>) () ->
+        new DataFileReader<>(new FsInput(fileSplit.getPath(), fs.getConf()), new GenericDatumReader<GenericRecord>()));
+
+      // Move to sync point from where to read the file.
+      reader.sync(fileSplit.getStart());
+      return reader;
+    } catch (IOException | InterruptedException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Error preparing Avro reader")
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 5235b00792..0505a5c37a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -20,10 +20,12 @@ package org.apache.drill.exec.store.avro;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
@@ -53,30 +55,30 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
         .extensions(formatConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .build();
   }
 
   @Override
-  protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
-    FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
-    builder.setReaderFactory(new AvroReaderFactory(scan.getMaxRecords()));
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new AvroReaderFactory(formatConfig, scan));
   }
 
-  private static class AvroReaderFactory extends FileScanFramework.FileReaderFactory {
+  private static class AvroReaderFactory extends FileReaderFactory {
 
-    private final int maxRecords;
-    public AvroReaderFactory(int maxRecords) {
-      this.maxRecords = maxRecords;
+    private final AvroFormatConfig config;
+    private final EasySubScan scan;
+
+    public AvroReaderFactory(AvroFormatConfig config, EasySubScan scan) {
+      this.config = config;
+      this.scan = scan;
     }
 
     @Override
-    public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new AvroBatchReader(maxRecords);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+      return new AvroBatchReader(config, scan, negotiator);
     }
   }
 }