You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/09/22 15:28:47 UTC

[drill] branch master updated: DRILL-8312: Convert Format Plugins to EVF V2 (#2656)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f7cc790dc DRILL-8312: Convert Format Plugins to EVF V2 (#2656)
6f7cc790dc is described below

commit 6f7cc790dc8d61501cd3265394221eec2496992d
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Thu Sep 22 11:28:39 2022 -0400

    DRILL-8312: Convert Format Plugins to EVF V2 (#2656)
---
 .../drill/exec/store/sas/SasBatchReader.java       | 50 +++++++---------------
 .../drill/exec/store/sas/SasFormatPlugin.java      | 36 ++++------------
 2 files changed, 25 insertions(+), 61 deletions(-)

diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
index 0305f8b68c..80b8fe2e7c 100644
--- a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasBatchReader.java
@@ -29,8 +29,10 @@ import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.MaterializedField;
@@ -39,7 +41,6 @@ import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,18 +55,15 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
-public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+public class SasBatchReader implements ManagedReader {
   private static final Logger logger = LoggerFactory.getLogger(SasBatchReader.class);
-  private final int maxRecords;
   private final List<SasColumnWriter> writerList;
-  private FileSplit split;
+  private final FileDescrip file;
   private InputStream fsStream;
   private SasFileReader sasFileReader;
-  private CustomErrorContext errorContext;
-  private RowSetLoader rowWriter;
+  private final CustomErrorContext errorContext;
+  private final RowSetLoader rowWriter;
   private Object[] firstRow;
-
-
   private String compressionMethod;
   private String fileLabel;
   private String fileType;
@@ -113,26 +111,15 @@ public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchem
     }
   }
 
-  public static class SasReaderConfig {
-    protected final SasFormatPlugin plugin;
-    public SasReaderConfig(SasFormatPlugin plugin) {
-      this.plugin = plugin;
-    }
-  }
-
-  public SasBatchReader(int maxRecords) {
-    this.maxRecords = maxRecords;
+  public SasBatchReader(FileSchemaNegotiator negotiator) {
     writerList = new ArrayList<>();
-  }
 
-  @Override
-  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
+    file = negotiator.file();
     errorContext = negotiator.parentErrorContext();
-    openFile(negotiator);
+    openFile();
 
     TupleMetadata schema;
-    if (negotiator.hasProvidedSchema()) {
+    if (negotiator.providedSchema() != null) {
       schema = negotiator.providedSchema();
     } else {
       schema = buildSchema();
@@ -143,19 +130,17 @@ public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchem
     ResultSetLoader loader = negotiator.build();
     rowWriter = loader.writer();
     buildWriterList(schema);
-
-    return true;
   }
 
-  private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+  private void openFile() {
     try {
-      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
       sasFileReader = new SasFileReaderImpl(fsStream);
       firstRow = sasFileReader.readNext();
     } catch (IOException e) {
       throw UserException
         .dataReadError(e)
-        .message("Unable to open SAS File %s", split.getPath())
+        .message("Unable to open SAS File %s", file.split().getPath())
         .addContext(e.getMessage())
         .addContext(errorContext)
         .build(logger);
@@ -170,7 +155,7 @@ public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchem
       String columnType = column.getType().getSimpleName();
       ColumnFormat columnFormat = column.getFormat();
       try {
-        MinorType type = null;
+        MinorType type;
         if (DateTimeConstants.TIME_FORMAT_STRINGS.contains(columnFormat.getName())) {
           type = MinorType.TIME;
         } else if (DateTimeConstants.DATE_FORMAT_STRINGS.containsKey(columnFormat.getName())) {
@@ -276,9 +261,6 @@ public class SasBatchReader implements ManagedReader<FileScanFramework.FileSchem
   }
 
   private boolean processNextRow() {
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
     Object[] row;
     try {
       // Process first row
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
index b5a135d482..07b753be4a 100644
--- a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
@@ -21,16 +21,13 @@ package org.apache.drill.exec.store.sas;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -40,15 +37,9 @@ public class SasFormatPlugin extends EasyFormatPlugin<SasFormatConfig> {
 
   private static class SasReaderFactory extends FileReaderFactory {
 
-    private final int maxRecords;
-
-    public SasReaderFactory(int maxRecords) {
-      this.maxRecords = maxRecords;
-    }
-
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new SasBatchReader(maxRecords);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      return new SasBatchReader(negotiator);
     }
   }
 
@@ -69,23 +60,14 @@ public class SasFormatPlugin extends EasyFormatPlugin<SasFormatConfig> {
       .extensions(pluginConfig.getExtensions())
       .fsConf(fsConf)
       .defaultName(DEFAULT_NAME)
-      .scanVersion(ScanFrameworkVersion.EVF_V1)
+      .scanVersion(ScanFrameworkVersion.EVF_V2)
       .supportsLimitPushdown(true)
       .build();
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options)  {
-    return new SasBatchReader(scan.getMaxRecords());
-  }
-
-  @Override
-  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new SasReaderFactory(scan.getMaxRecords()));
-
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new SasReaderFactory());
   }
 }