You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/06/07 10:38:51 UTC

[drill] 02/05: DRILL-7261: Simplify Easy framework config for new scan

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 1bf7f1525374c9e414f89d5a17056e860a125510
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Sat May 25 20:40:22 2019 -0700

    DRILL-7261: Simplify Easy framework config for new scan
    
    Most format plugins are created using the Easy format plugin. A recent
    change added support for the "row set" scan framework. After converting
    the text and log reader plugins, it became clear that the setup code
    could be made simpler.
    
    * Add the user name to the "file scan" framework.
    * Pass the file system, split and user name to the batch reader via
      the "schema negotiator" rather than via the constructor.
    * Create the traditional "scan batch" scan or the new row-set scan via
      functions instead of classes.
    * Add Easy config option and method to choose the kind of scan
      framework.
    * Add Easy config options for some newer options such as whether the
      plugin supports statistics.
    
    Simplified reader creation
    
    * The batch reader can be created just by overriding a method.
    * A default error context is provided if the plugin does not provide
      one.
    
    Tested by running all unit tests for the CSV reader which is based on
    the new framework, and by testing the converted log reader (that reader
    is not part of this commit.)
    
    closes #1796
---
 .../physical/impl/scan/file/FileScanFramework.java |   8 +-
 .../impl/scan/framework/ManagedScanFramework.java  |   7 +
 .../impl/scan/framework/SchemaNegotiator.java      |   7 +
 .../impl/scan/framework/SchemaNegotiatorImpl.java  |   5 +
 .../exec/store/dfs/easy/EasyFormatPlugin.java      | 408 ++++++++++-----------
 .../exec/store/easy/text/TextFormatPlugin.java     | 142 +++----
 .../easy/text/compliant/TestCsvWithHeaders.java    |   4 +-
 7 files changed, 282 insertions(+), 299 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index f69aa40..761f68b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -62,7 +62,8 @@ import org.apache.hadoop.mapred.FileSplit;
 
 public class FileScanFramework extends ManagedScanFramework {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileScanFramework.class);
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(FileScanFramework.class);
 
   /**
    * The file schema negotiator adds no behavior at present, but is
@@ -80,7 +81,6 @@ public class FileScanFramework extends ManagedScanFramework {
     /**
      * Gives the Drill file system for this operator.
      */
-
     DrillFileSystem fileSystem();
 
     /**
@@ -186,6 +186,10 @@ public class FileScanFramework extends ManagedScanFramework {
       return newReader();
     }
 
+    public CustomErrorContext errorContext() {
+      return fileFramework == null ? null : fileFramework.errorContext();
+    }
+
     public abstract ManagedReader<? extends FileSchemaNegotiator> newReader();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
index 52203da..a7a46fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
@@ -135,10 +135,17 @@ public class ManagedScanFramework implements ScanOperatorEvents {
 
   public static class ScanFrameworkBuilder extends ScanOrchestratorBuilder {
     protected ReaderFactory readerFactory;
+    protected String userName;
 
     public void setReaderFactory(ReaderFactory readerFactory) {
       this.readerFactory = readerFactory;
     }
+
+    public ReaderFactory readerFactory() { return readerFactory; }
+
+    public void setUserName(String userName) {
+      this.userName = userName;
+    }
   }
 
   // Inputs
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index bc303ec..74ea512 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -64,8 +64,15 @@ public interface SchemaNegotiator {
    * Specify an advanced error context which allows the reader to
    * fill in custom context values.
    */
+
   void setErrorContext(CustomErrorContext context);
 
+  /*
+   * The name of the user running the query.
+   */
+
+  String userName();
+
   /**
    * Specify the table schema if this is an early-schema reader. Need
    * not be called for a late-schema readers. The schema provided here,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index ab70734..15008c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -100,6 +100,11 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
     batchSize = maxRecordsPerBatch;
   }
 
+  @Override
+  public String userName() {
+    return framework.builder.userName;
+  }
+
   /**
    * Callback from the schema negotiator to build the schema from information from
    * both the table and scan operator. Returns the result set loader to be used
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 74439a4..a170122 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -23,9 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.drill.exec.physical.base.MetadataProviderManager;
-import org.apache.drill.shaded.guava.com.google.common.base.Functions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -35,17 +33,21 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.MetadataProviderManager;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -62,11 +64,13 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Functions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Base class for various file readers.
@@ -80,6 +84,8 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
 
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+
   /**
    * Defines the static, programmer-defined options for this plugin. These
    * options are attributes of how the plugin works. The plugin config,
@@ -104,214 +110,43 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
     public boolean supportsProjectPushdown;
     public boolean supportsAutoPartitioning;
+    public boolean supportsStatistics;
     public int readerOperatorType = -1;
     public int writerOperatorType = -1;
-  }
 
-  /**
-   * Creates the scan batch to use with the plugin. Drill supports the "classic"
-   * style of scan batch and readers, along with the newer size-aware,
-   * component-based version. The implementation of this class assembles the
-   * readers and scan batch operator as needed for each version.
-   */
+    // Choose whether to use the "traditional" or "enhanced" reader
+    // structure. Can also be selected at runtime by overriding
+    // useEnhancedScan().
 
-  public interface ScanBatchCreator {
-    CloseableRecordBatch buildScan(
-        final FragmentContext context, EasySubScan scan)
-            throws ExecutionSetupException;
+    public boolean useEnhancedScan;
   }
 
   /**
-   * Use the original scanner based on the {@link RecordReader} interface.
-   * Requires that the storage plugin roll its own solutions for null columns.
-   * Is not able to limit vector or batch sizes. Retained or backward
-   * compatibility with "classic" format plugins which have not yet been
-   * upgraded to use the new framework.
+   * Builds the readers for the V3 text scan operator.
    */
-
-  public static class ClassicScanBatchCreator implements ScanBatchCreator {
+  private static class EasyReaderFactory extends FileReaderFactory {
 
     private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+    private final EasySubScan scan;
+    private final FragmentContext context;
 
-    public ClassicScanBatchCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+    public EasyReaderFactory(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+        final EasySubScan scan, FragmentContext context) {
       this.plugin = plugin;
+      this.scan = scan;
+      this.context = context;
     }
 
     @Override
-    public CloseableRecordBatch buildScan(
-        final FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
-      final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
-
-      if (! columnExplorer.isStarQuery()) {
-        scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
-            columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth(), scan.getSchema());
-        scan.setOperatorId(scan.getOperatorId());
-      }
-
-      final OperatorContext oContext = context.newOperatorContext(scan);
-      final DrillFileSystem dfs;
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
       try {
-        dfs = oContext.newFileSystem(plugin.easyConfig().fsConf);
-      } catch (final IOException e) {
-        throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
-      }
-
-      final List<RecordReader> readers = new LinkedList<>();
-      final List<Map<String, String>> implicitColumns = Lists.newArrayList();
-      Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
-      final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
-      for (final FileWork work : scan.getWorkUnits()) {
-        final RecordReader recordReader = getRecordReader(
-            plugin, context, dfs, work, scan.getColumns(), scan.getUserName());
-        readers.add(recordReader);
-        final List<String> partitionValues = ColumnExplorer.listPartitionValues(
-            work.getPath(), scan.getSelectionRoot(), false);
-        final Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(
-            work.getPath(), partitionValues, supportsFileImplicitColumns);
-        implicitColumns.add(implicitValues);
-        if (implicitValues.size() > mapWithMaxColumns.size()) {
-          mapWithMaxColumns = implicitValues;
-        }
-      }
-
-      // all readers should have the same number of implicit columns, add missing ones with value null
-      final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
-      for (final Map<String, String> map : implicitColumns) {
-        map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+        return plugin.newBatchReader(scan, context.getOptions());
+      } catch (ExecutionSetupException e) {
+        throw UserException.validationError(e)
+          .addContext("Reason", "Failed to create a batch reader")
+          .addContext(errorContext())
+          .build(logger);
       }
-
-      return new ScanBatch(context, oContext, readers, implicitColumns);
-    }
-
-    /**
-     * Create a record reader given a file system, a file description and other
-     * information. For backward compatibility, calls the plugin method by
-     * default.
-     *
-     * @param plugin
-     *          the plugin creating the scan
-     * @param context
-     *          fragment context for the fragment running the scan
-     * @param dfs
-     *          Drill's distributed file system facade
-     * @param fileWork
-     *          description of the file to scan
-     * @param columns
-     *          list of columns to project
-     * @param userName
-     *          the name of the user performing the scan
-     * @return a scan operator
-     * @throws ExecutionSetupException
-     *           if anything goes wrong
-     */
-
-    public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
-        FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
-        List<SchemaPath> columns, String userName) throws ExecutionSetupException {
-      return plugin.getRecordReader(context, dfs, fileWork, columns, userName);
-    }
-  }
-
-  /**
-   * Revised scanner based on the revised {@link org.apache.drill.exec.physical.rowSet.ResultSetLoader}
-   * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} classes.
-   * Handles most projection tasks automatically. Able to limit
-   * vector and batch sizes. Use this for new format plugins.
-   */
-
-  public abstract static class ScanFrameworkCreator
-      implements ScanBatchCreator {
-
-    protected EasyFormatPlugin<? extends FormatPluginConfig> plugin;
-
-    public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
-      this.plugin = plugin;
-    }
-
-    /**
-     * Builds the revised {@link FileBatchReader}-based scan batch.
-     *
-     * @param context
-     * @param scan
-     * @return
-     * @throws ExecutionSetupException
-     */
-
-    @Override
-    public CloseableRecordBatch buildScan(
-        final FragmentContext context,
-        final EasySubScan scan) throws ExecutionSetupException {
-
-      // Assemble the scan operator and its wrapper.
-
-      try {
-        final FileScanBuilder builder = frameworkBuilder(scan);
-        builder.setProjection(scan.getColumns());
-        builder.setFiles(scan.getWorkUnits());
-        builder.setConfig(plugin.easyConfig().fsConf);
-
-        // The text readers use required Varchar columns to represent null columns.
-
-        builder.allowRequiredNullColumns(true);
-        final Path selectionRoot = scan.getSelectionRoot();
-        if (selectionRoot != null) {
-          builder.metadataOptions().setSelectionRoot(selectionRoot);
-          builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth());
-        }
-        FileScanFramework framework = builder.buildFileFramework();
-        return new OperatorRecordBatch(
-            context, scan,
-            new ScanOperatorExec(
-                framework));
-      } catch (final UserException e) {
-        // Rethrow user exceptions directly
-        throw e;
-      } catch (final Throwable e) {
-        // Wrap all others
-        throw new ExecutionSetupException(e);
-      }
-    }
-
-    /**
-     * Create the plugin-specific framework that manages the scan. The framework
-     * creates batch readers one by one for each file or block. It defines semantic
-     * rules for projection. It handles "early" or "late" schema readers. A typical
-     * framework builds on standardized frameworks for files in general or text
-     * files in particular.
-     *
-     * @param scan the physical operation definition for the scan operation. Contains
-     * one or more files to read. (The Easy format plugin works only for files.)
-     * @return the scan framework which orchestrates the scan operation across
-     * potentially many files
-     * @throws ExecutionSetupException for all setup failures
-     */
-    protected abstract FileScanBuilder frameworkBuilder(
-        EasySubScan scan) throws ExecutionSetupException;
-  }
-
-  /**
-   * Generic framework creator for files that just use the basic file
-   * support: metadata, etc. Specialized use cases (special "columns"
-   * column, say) will require a specialized implementation.
-   */
-
-  public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator {
-
-    private final FileReaderFactory readerCreator;
-
-    public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
-        FileReaderFactory readerCreator) {
-      super(plugin);
-      this.readerCreator = readerCreator;
-    }
-
-    @Override
-    protected FileScanBuilder frameworkBuilder(
-        EasySubScan scan) throws ExecutionSetupException {
-
-      FileScanBuilder builder = new FileScanBuilder();
-      builder.setReaderFactory(readerCreator);
-      return builder;
     }
   }
 
@@ -428,26 +263,173 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
   protected CloseableRecordBatch getReaderBatch(final FragmentContext context,
       final EasySubScan scan) throws ExecutionSetupException {
-    return scanBatchCreator(context.getOptions()).buildScan(context, scan);
+    if (useEnhancedScan(context.getOptions())) {
+      return buildScan(context, scan);
+    } else {
+      return buildScanBatch(context, scan);
+    }
+  }
+
+  /**
+   * Choose whether to use the enhanced scan based on the row set and scan
+   * framework, or the "traditional" ad-hoc structure based on ScanBatch.
+   * Normally set as a config option. Override this method if you want to
+   * make the choice based on a system/session option.
+   *
+   * @return true to use the enhanced scan framework, false for the
+   * traditional scan-batch framework
+   */
+
+  protected boolean useEnhancedScan(OptionManager options) {
+    return easyConfig.useEnhancedScan;
+  }
+
+  /**
+   * Use the original scanner based on the {@link RecordReader} interface.
+   * Requires that the storage plugin roll its own solutions for null columns.
+   * Is not able to limit vector or batch sizes. Retained or backward
+   * compatibility with "classic" format plugins which have not yet been
+   * upgraded to use the new framework.
+   */
+
+  private CloseableRecordBatch buildScanBatch(FragmentContext context,
+      EasySubScan scan) throws ExecutionSetupException {
+    final ColumnExplorer columnExplorer =
+        new ColumnExplorer(context.getOptions(), scan.getColumns());
+
+    if (! columnExplorer.isStarQuery()) {
+      scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
+          columnExplorer.getTableColumns(), scan.getSelectionRoot(),
+          scan.getPartitionDepth(), scan.getSchema());
+      scan.setOperatorId(scan.getOperatorId());
+    }
+
+    final OperatorContext oContext = context.newOperatorContext(scan);
+    final DrillFileSystem dfs;
+    try {
+      dfs = oContext.newFileSystem(easyConfig().fsConf);
+    } catch (final IOException e) {
+      throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+    }
+
+    final List<RecordReader> readers = new LinkedList<>();
+    final List<Map<String, String>> implicitColumns = Lists.newArrayList();
+    Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
+    final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
+    for (final FileWork work : scan.getWorkUnits()) {
+      final RecordReader recordReader = getRecordReader(
+          context, dfs, work, scan.getColumns(), scan.getUserName());
+      readers.add(recordReader);
+      final List<String> partitionValues = ColumnExplorer.listPartitionValues(
+          work.getPath(), scan.getSelectionRoot(), false);
+      final Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(
+          work.getPath(), partitionValues, supportsFileImplicitColumns);
+      implicitColumns.add(implicitValues);
+      if (implicitValues.size() > mapWithMaxColumns.size()) {
+        mapWithMaxColumns = implicitValues;
+      }
+    }
+
+    // all readers should have the same number of implicit columns, add missing ones with value null
+    final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
+    for (final Map<String, String> map : implicitColumns) {
+      map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+    }
+
+    return new ScanBatch(context, oContext, readers, implicitColumns);
   }
 
   /**
-   * Create the scan batch creator. Needed only when using the revised scan batch. In that
-   * case, override the <tt>readerIterator()</tt> method on the custom scan batch
-   * creator implementation.
+   * Revised scanner based on the revised {@link org.apache.drill.exec.physical.rowSet.ResultSetLoader}
+   * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} classes.
+   * Handles most projection tasks automatically. Able to limit
+   * vector and batch sizes. Use this for new format plugins.
+   */
+
+  private CloseableRecordBatch buildScan(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+
+    // Assemble the scan operator and its wrapper.
+
+    try {
+      final FileScanBuilder builder = frameworkBuilder(context.getOptions(), scan);
+      builder.setProjection(scan.getColumns());
+      builder.setFiles(scan.getWorkUnits());
+      builder.setConfig(easyConfig().fsConf);
+      builder.setUserName(scan.getUserName());
+
+      // Pass along the output schema, if any
+
+      builder.setOutputSchema(scan.getSchema());
+      final Path selectionRoot = scan.getSelectionRoot();
+      if (selectionRoot != null) {
+        builder.metadataOptions().setSelectionRoot(selectionRoot);
+        builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth());
+      }
+
+      // Add batch reader, if none specified
+
+      if (builder.readerFactory() == null) {
+        builder.setReaderFactory(new EasyReaderFactory(this, scan, context));
+      }
+
+      // Add error context, if none is specified
+
+      if (builder.errorContext() == null) {
+        builder.setContext(
+            new CustomErrorContext() {
+              @Override
+              public void addContext(UserException.Builder builder) {
+                builder.addContext("Format plugin:",
+                    EasyFormatPlugin.this.getClass().getSimpleName());
+                builder.addContext("Plugin config name:", getName());
+              }
+            });
+      }
+
+      FileScanFramework framework = builder.buildFileFramework();
+      return new OperatorRecordBatch(context, scan,
+          new ScanOperatorExec(framework));
+    } catch (final UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (final Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
+      EasySubScan scan, OptionManager options) throws ExecutionSetupException {
+    throw new ExecutionSetupException("Must implement newBatchReader() if using the enhanced framework.");
+  }
+
+  /**
+   * Create the plugin-specific framework that manages the scan. The framework
+   * creates batch readers one by one for each file or block. It defines semantic
+   * rules for projection. It handles "early" or "late" schema readers. A typical
+   * framework builds on standardized frameworks for files in general or text
+   * files in particular.
    *
-   * @return the strategy for creating the scan batch for this plugin
+   * @param scan the physical operation definition for the scan operation. Contains
+   * one or more files to read. (The Easy format plugin works only for files.)
+   * @return the scan framework which orchestrates the scan operation across
+   * potentially many files
+   * @throws ExecutionSetupException for all setup failures
    */
 
-  protected ScanBatchCreator scanBatchCreator(OptionManager options) {
-    return new ClassicScanBatchCreator(this);
+  protected FileScanBuilder frameworkBuilder(
+      OptionManager options, EasySubScan scan) throws ExecutionSetupException {
+    throw new ExecutionSetupException("Must implement frameworkBuilder() if using the enhanced framework.");
   }
 
   public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) {
     return false;
   }
 
-  public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
+  public RecordWriter getRecordWriter(FragmentContext context,
+                                      EasyWriter writer) throws IOException {
+    throw new UnsupportedOperationException("unimplemented");
+  }
 
   public StatisticsRecordWriter getStatisticsRecordWriter(FragmentContext context, EasyWriter writer) throws IOException
   {
@@ -519,4 +501,18 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
   public int getReaderOperatorType() { return easyConfig.readerOperatorType; }
   public int getWriterOperatorType() { return easyConfig.writerOperatorType; }
+
+  @Override
+  public boolean supportsStatistics() { return easyConfig.supportsStatistics; }
+
+  @Override
+  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public void writeStatistics(TableStatistics statistics, FileSystem fs,
+      Path statsTablePath) throws IOException {
+    throw new UnsupportedOperationException("unimplemented");
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index f764c38..3d122ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -28,9 +28,8 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -42,7 +41,6 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReade
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
@@ -67,7 +65,6 @@ import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.drill.exec.vector.accessor.convert.AbstractConvertFromString;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
@@ -199,70 +196,6 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     }
   }
 
-  /**
-   * Builds the V3 text scan operator.
-   */
-  private static class TextScanBatchCreator extends ScanFrameworkCreator {
-
-    private final TextFormatPlugin textPlugin;
-
-    public TextScanBatchCreator(TextFormatPlugin plugin) {
-      super(plugin);
-      textPlugin = plugin;
-    }
-
-    @Override
-    protected FileScanBuilder frameworkBuilder(
-        EasySubScan scan) throws ExecutionSetupException {
-      ColumnsScanBuilder builder = new ColumnsScanBuilder();
-      builder.setReaderFactory(new ColumnsReaderFactory(textPlugin));
-
-      // Provide custom error context
-      builder.setContext(
-          new CustomErrorContext() {
-            @Override
-            public void addContext(UserException.Builder builder) {
-              builder.addContext("Format plugin:", PLUGIN_NAME);
-              builder.addContext("Plugin config name:", textPlugin.getName());
-              builder.addContext("Extract headers:",
-                  Boolean.toString(textPlugin.getConfig().isHeaderExtractionEnabled()));
-              builder.addContext("Skip headers:",
-                  Boolean.toString(textPlugin.getConfig().isSkipFirstLine()));
-            }
-          });
-
-      // If this format has no headers, or wants to skip them,
-      // then we must use the columns column to hold the data.
-
-      builder.requireColumnsArray(
-          ! textPlugin.getConfig().isHeaderExtractionEnabled());
-
-      // Text files handle nulls in an unusual way. Missing columns
-      // are set to required Varchar and filled with blanks. Yes, this
-      // means that the SQL statement or code cannot differentiate missing
-      // columns from empty columns, but that is how CSV and other text
-      // files have been defined within Drill.
-
-      builder.setNullType(
-          MajorType.newBuilder()
-            .setMinorType(MinorType.VARCHAR)
-            .setMode(DataMode.REQUIRED)
-            .build());
-
-      // Pass along the output schema, if any
-
-      builder.setOutputSchema(scan.getSchema());
-
-      // CSV maps blank columns to nulls (for nullable non-string columns),
-      // or to the default value (for non-nullable non-string columns.)
-
-      builder.setConversionProperty(AbstractConvertFromString.BLANK_ACTION_PROP,
-          AbstractConvertFromString.BLANK_AS_NULL);
-
-      return builder;
-    }
-  }
-
   public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
      this(name, context, fsConf, storageConfig, new TextFormatConfig());
   }
@@ -284,6 +217,9 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     config.defaultName = PLUGIN_NAME;
     config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE;
     config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE;
+
+    // Uncomment this, and remove useEnhancedScan(), when v2 is retired
+    //config.useEnhancedScan = true;
     return config;
   }
 
@@ -304,16 +240,12 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  protected ScanBatchCreator scanBatchCreator(OptionManager options) {
+  protected boolean useEnhancedScan(OptionManager options) {
     // Create the "legacy", "V2" reader or the new "V3" version based on
     // the result set loader. This code should be temporary: the two
     // readers provide identical functionality for the user; only the
     // internals differ.
-    if (options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY)) {
-      return new TextScanBatchCreator(this);
-    } else {
-      return new ClassicScanBatchCreator(this);
-    }
+    return options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY);
   }
 
   // TODO: Remove this once the V2 reader is removed.
@@ -337,6 +269,53 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
+  protected FileScanBuilder frameworkBuilder(
+      OptionManager options, EasySubScan scan) throws ExecutionSetupException {
+    ColumnsScanBuilder builder = new ColumnsScanBuilder();
+    builder.setReaderFactory(new ColumnsReaderFactory(this));
+
+    // If this format has no headers, or wants to skip them,
+    // then we must use the columns column to hold the data.
+
+    builder.requireColumnsArray(
+        ! getConfig().isHeaderExtractionEnabled());
+
+    // Text files handle nulls in an unusual way. Missing columns
+    // are set to required Varchar and filled with blanks. Yes, this
+    // means that the SQL statement or code cannot differentiate missing
+    // columns from empty columns, but that is how CSV and other text
+    // files have been defined within Drill.
+
+    builder.setNullType(Types.required(MinorType.VARCHAR));
+
+    // CSV maps blank columns to nulls (for nullable non-string columns),
+    // or to the default value (for non-nullable non-string columns.)
+
+    builder.setConversionProperty(AbstractConvertFromString.BLANK_ACTION_PROP,
+        AbstractConvertFromString.BLANK_AS_NULL);
+
+    // The text readers use required Varchar columns to represent null columns.
+
+    builder.allowRequiredNullColumns(true);
+
+    // Provide custom error context
+    builder.setContext(
+        new CustomErrorContext() {
+          @Override
+          public void addContext(UserException.Builder builder) {
+            builder.addContext("Format plugin:", PLUGIN_NAME);
+            builder.addContext("Plugin config name:", getName());
+            builder.addContext("Extract headers:",
+                Boolean.toString(getConfig().isHeaderExtractionEnabled()));
+            builder.addContext("Skip first line:",
+                Boolean.toString(getConfig().isSkipFirstLine()));
+          }
+        });
+
+    return builder;
+  }
+
+  @Override
   public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
     final Map<String, String> options = new HashMap<>();
 
@@ -355,21 +334,6 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public boolean supportsStatistics() {
-    return false;
-  }
-
-  @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  @Override
   protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) {
     long data = 0;
     for (final CompleteFileWork work : scan.getWorkIterable()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 645af30..7abbf3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -926,7 +926,7 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       assertTrue(e.getMessage().contains("Format plugin: text"));
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
       assertTrue(e.getMessage().contains("Extract headers: true"));
-      assertTrue(e.getMessage().contains("Skip headers: false"));
+      assertTrue(e.getMessage().contains("Skip first line: false"));
     } catch (Exception e) {
       fail();
     } finally {
@@ -974,7 +974,7 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       assertTrue(e.getMessage().contains("Format plugin: text"));
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
       assertTrue(e.getMessage().contains("Extract headers: true"));
-      assertTrue(e.getMessage().contains("Skip headers: false"));
+      assertTrue(e.getMessage().contains("Skip first line: false"));
     } catch (Exception e) {
       fail();
     } finally {