You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/03/19 18:29:43 UTC

[drill] 01/04: DRILL-7095: Expose table schema (TupleMetadata) to physical operator (EasySubScan)

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit df0091273ef8d24c88c606b8e2622c221658b4ef
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Thu Mar 14 18:46:37 2019 +0200

    DRILL-7095: Expose table schema (TupleMetadata) to physical operator (EasySubScan)
    
    1. Add system / session option store.table.use_schema_file to control if file schema can be used during query execution. False by default.
    2. Added methods in StoragePlugin interface which allow to create Group Scan with provided table schema.
    3. EasyGroupScan and EasySubScan now contain table schema, also they are able to serialize / deserialize it along with other scan properties.
    4. DrillTable which is the main entry point for schema provisioning, has method to store schema and later uses it to create physical scan.
    5. WorkspaceSchema when returning Drill table instance will get table schema from table root if available and if store.table.use_schema_file is set to true.
    
    This PR is the next step for Schema Provisioning project which currently exposes schema only for text reader.
    
    closes #1696
---
 .../java/org/apache/drill/exec/ExecConstants.java  |  4 +++
 .../drill/exec/planner/logical/DrillTable.java     |  8 ++++-
 .../exec/server/options/SystemOptionManager.java   |  1 +
 .../drill/exec/store/AbstractStoragePlugin.java    | 12 ++++++-
 .../org/apache/drill/exec/store/StoragePlugin.java | 24 +++++++++++++
 .../drill/exec/store/dfs/FileSystemPlugin.java     | 14 +++++---
 .../apache/drill/exec/store/dfs/FormatPlugin.java  | 17 ++++++---
 .../exec/store/dfs/WorkspaceSchemaFactory.java     | 15 +++++++-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      |  9 +++--
 .../drill/exec/store/dfs/easy/EasyGroupScan.java   | 41 +++++++++++++---------
 .../drill/exec/store/dfs/easy/EasySubScan.java     | 32 ++++++++++-------
 .../sequencefile/SequenceFileFormatPlugin.java     |  2 +-
 .../exec/store/easy/text/TextFormatPlugin.java     | 21 +++++------
 .../exec/store/parquet/ParquetFormatPlugin.java    |  2 +-
 .../java-exec/src/main/resources/drill-module.conf |  1 +
 .../test/java/org/apache/drill/PlanningBase.java   |  2 ++
 16 files changed, 147 insertions(+), 58 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 25ac2c9..7917b7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -450,6 +450,10 @@ public final class ExecConstants {
   public static final BooleanValidator JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR = new BooleanValidator(JSON_READER_ESCAPE_ANY_CHAR,
     new OptionDescription("Enables the JSON record reader in Drill to escape any character. Default is false. (Drill 1.16+)"));
 
+  public static final String STORE_TABLE_USE_SCHEMA_FILE = "store.table.use_schema_file";
+  public static final BooleanValidator STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR = new BooleanValidator(STORE_TABLE_USE_SCHEMA_FILE,
+    new OptionDescription("Controls if schema file stored in table root directory will be used during query execution. (Drill 1.16+)"));
+
   /**
    * The column label (for directory levels) in results when querying files in a directory
    * E.g.  labels: dir0   dir1<pre>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 4463d18..f6f73ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.physical.base.SchemalessScan;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -50,6 +51,7 @@ public abstract class DrillTable implements Table {
   private SessionOptionManager options;
   // Stores the statistics(rowcount, NDV etc.) associated with the table
   private DrillStatsTable statsTable;
+  private TupleMetadata schema;
 
   /**
    * Creates a DrillTable instance for a @{code TableType#Table} table.
@@ -94,6 +96,10 @@ public abstract class DrillTable implements Table {
     this.options = options;
   }
 
+  public void setSchema(TupleMetadata schema) {
+    this.schema = schema;
+  }
+
   public void setGroupScan(GroupScan scan) {
     this.scan = scan;
   }
@@ -103,7 +109,7 @@ public abstract class DrillTable implements Table {
       if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) {
         this.scan = new SchemalessScan(userName, ((FileSelection) selection).getSelectionRoot());
       } else {
-        this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection), options);
+        this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection), options, schema);
       }
     }
     return scan;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e201e26..ab2978f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -182,6 +182,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR),
+      new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR),
       new OptionDefinition(ExecConstants.ENABLE_UNION_TYPE),
       new OptionDefinition(ExecConstants.TEXT_ESTIMATED_ROW_SIZE),
       new OptionDefinition(ExecConstants.JSON_EXTENDED_TYPES),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 175152a..fc27b75 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.PlannerPhase;
 
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SessionOptionManager;
@@ -106,13 +107,17 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
     }
   }
 
-
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
     return getPhysicalScan(userName, selection);
   }
 
   @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options, TupleMetadata schema) throws IOException {
+    return getPhysicalScan(userName, selection, options);
+  }
+
+  @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
   }
@@ -123,6 +128,11 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
   }
 
   @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options, TupleMetadata schema) throws IOException {
+    return getPhysicalScan(userName, selection, columns, options);
+  }
+
+  @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
     throw new UnsupportedOperationException();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 4e6a7c2..eae2413 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 
@@ -76,6 +77,17 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    *
    * @param userName User whom to impersonate when when reading the contents as part of Scan.
    * @param selection The configured storage engine specific selection.
+   * @param options (optional) session options
+   * @param schema (optional) table schema
+   * @return The physical scan operator for the particular GroupScan (read) node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options, TupleMetadata schema) throws IOException;
+
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
    * @param columns (optional) The list of column names to scan from the data source.
    * @return The physical scan operator for the particular GroupScan (read) node.
   */
@@ -93,6 +105,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
   AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException;
 
   /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param columns (optional) The list of column names to scan from the data source.
+   * @param options (optional) session options
+   * @param schema (optional) table schema
+   * @return The physical scan operator for the particular GroupScan (read) node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options, TupleMetadata schema) throws IOException;
+
+  /**
    * Method returns a Jackson serializable object that extends a StoragePluginConfig.
    *
    * @return an extension of StoragePluginConfig
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 5656c55..ab26e8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -163,19 +164,24 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
 
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
-    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options);
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options, TupleMetadata schema) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options, schema);
   }
 
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
-    return getPhysicalScan(userName, selection, columns, null);
+    return getPhysicalScan(userName, selection, columns, null, null);
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options, TupleMetadata schema) throws IOException {
     FormatSelection formatSelection = selection.getWith(lpPersistance, FormatSelection.class);
     FormatPlugin plugin = getFormatPlugin(formatSelection.getFormat());
-    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns, options);
+    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns, options, schema);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 254cddb..f9200c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -52,7 +53,7 @@ public interface FormatPlugin {
 
   FormatMatcher getMatcher();
 
-  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException;
+  AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException;
 
   Set<StoragePluginOptimizerRule> getOptimizerRules();
 
@@ -62,11 +63,19 @@ public interface FormatPlugin {
     return getGroupScan(userName, selection, columns);
   }
 
-  public boolean supportsStatistics();
+  default AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, TupleMetadata schema) throws IOException {
+    return getGroupScan(userName, selection, columns);
+  }
+
+  default AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options, TupleMetadata schema) throws IOException {
+    return getGroupScan(userName, selection, columns, options);
+  }
+
+  boolean supportsStatistics();
 
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException;
+  TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException;
 
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException;
+  void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException;
 
   FormatPluginConfig getConfig();
   StoragePluginConfig getStorageConfig();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 2c629ff..b6b29e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -63,6 +63,7 @@ import org.apache.drill.exec.planner.logical.DrillViewTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
 import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
+import org.apache.drill.exec.record.metadata.schema.FsMetastoreSchemaProvider;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -109,7 +110,7 @@ public class WorkspaceSchemaFactory {
       WorkspaceConfig config,
       List<FormatMatcher> formatMatchers,
       LogicalPlanPersistence logicalPlanPersistence,
-      ScanResult scanResult) throws ExecutionSetupException, IOException {
+      ScanResult scanResult) throws ExecutionSetupException {
     this.logicalPlanPersistence = logicalPlanPersistence;
     this.fsConf = plugin.getFsConf();
     this.plugin = plugin;
@@ -572,9 +573,21 @@ public class WorkspaceSchemaFactory {
       }
       final DrillTable table = tables.get(tableKey);
       setMetadataTable(table, tableName);
+      setSchema(table, tableName);
       return table;
     }
 
+    private void setSchema(DrillTable table, String tableName) {
+      if (schemaConfig.getOption(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE).bool_val) {
+        try {
+          FsMetastoreSchemaProvider schemaProvider = new FsMetastoreSchemaProvider(this, tableName);
+          table.setSchema(schemaProvider.read().getSchema());
+        } catch (IOException e) {
+          logger.debug("Unable to deserialize schema from schema file for table: " + tableName, e);
+        }
+      }
+    }
+
     private void setMetadataTable(final DrillTable table, final String tableName) {
       if (table == null) {
         return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index dc1f053..a1d1a88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -143,7 +143,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
       if (! columnExplorer.isStarQuery()) {
         scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
-            columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth());
+            columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth(), scan.getSchema());
         scan.setOperatorId(scan.getOperatorId());
       }
 
@@ -296,8 +296,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     }
 
     @Override
-    protected FileScanFramework buildFramework(
-        EasySubScan scan) throws ExecutionSetupException {
+    protected FileScanFramework buildFramework(EasySubScan scan) {
 
       final FileScanFramework framework = new FileScanFramework(
               scan.getColumns(),
@@ -471,14 +470,14 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   }
 
   @Override
-  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException {
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
     return new EasyWriter(child, location, partitionColumns, this);
   }
 
   @Override
   public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
       throws IOException {
-    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, null);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 6a6243c..611a9e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,6 +32,8 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
@@ -69,6 +71,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
   private List<CompleteFileWork> chunks;
   private List<EndpointAffinity> endpointAffinities;
   private Path selectionRoot;
+  private final TupleMetadata schema;
 
   @JsonCreator
   public EasyGroupScan(
@@ -78,18 +81,15 @@ public class EasyGroupScan extends AbstractFileGroupScan {
       @JsonProperty("format") FormatPluginConfig formatConfig,
       @JacksonInject StoragePluginRegistry engineRegistry,
       @JsonProperty("columns") List<SchemaPath> columns,
-      @JsonProperty("selectionRoot") Path selectionRoot
+      @JsonProperty("selectionRoot") Path selectionRoot,
+      @JsonProperty("schema") TupleSchema schema
       ) throws IOException, ExecutionSetupException {
         this(ImpersonationUtil.resolveUserName(userName),
             FileSelection.create(null, files, selectionRoot),
             (EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig),
             columns,
-            selectionRoot);
-  }
-
-  public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, Path selectionRoot)
-      throws IOException {
-    this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot);
+            selectionRoot,
+            schema);
   }
 
   public EasyGroupScan(
@@ -97,13 +97,15 @@ public class EasyGroupScan extends AbstractFileGroupScan {
       FileSelection selection,
       EasyFormatPlugin<?> formatPlugin,
       List<SchemaPath> columns,
-      Path selectionRoot
+      Path selectionRoot,
+      TupleMetadata schema
       ) throws IOException{
     super(userName);
     this.selection = Preconditions.checkNotNull(selection);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.columns = columns == null ? ALL_COLUMNS : columns;
     this.selectionRoot = selectionRoot;
+    this.schema = schema;
     initFromSelection(selection, formatPlugin);
   }
 
@@ -113,9 +115,10 @@ public class EasyGroupScan extends AbstractFileGroupScan {
       EasyFormatPlugin<?> formatPlugin,
       List<SchemaPath> columns,
       Path selectionRoot,
-      int minWidth
+      int minWidth,
+      TupleMetadata schema
       ) throws IOException{
-    this(userName, selection, formatPlugin, columns, selectionRoot);
+    this(userName, selection, formatPlugin, columns, selectionRoot, schema);
 
     // Set the minimum width of this reader. Primarily used for testing
     // to force parallelism even for small test files.
@@ -138,6 +141,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
     minWidth = that.minWidth;
     mappings = that.mappings;
     partitionDepth = that.partitionDepth;
+    schema = that.schema;
   }
 
   @JsonIgnore
@@ -202,7 +206,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     assert children == null || children.isEmpty();
     return new EasyGroupScan(this);
   }
@@ -210,7 +214,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
     if (endpointAffinities == null) {
-        logger.debug("chunks: {}", chunks.size());
+        logger.debug("Chunks size: {}", chunks.size());
         endpointAffinities = AffinityCreator.getAffinityMap(chunks);
     }
     return endpointAffinities;
@@ -244,7 +248,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
     EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
-        columns, selectionRoot, partitionDepth);
+        columns, selectionRoot, partitionDepth, schema);
     subScan.setOperatorId(this.getOperatorId());
     return subScan;
   }
@@ -269,8 +273,8 @@ public class EasyGroupScan extends AbstractFileGroupScan {
 
   @Override
   public String toString() {
-    final String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s]";
-    return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles());
+    String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s, schema=%s]";
+    return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles(), schema);
   }
 
   @Override
@@ -281,7 +285,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
     if (!formatPlugin.supportsPushDown()) {
-      throw new IllegalStateException(String.format("%s doesn't support pushdown.", this.getClass().getSimpleName()));
+      throw new IllegalStateException(String.format("%s doesn't support push down.", this.getClass().getSimpleName()));
     }
     EasyGroupScan newScan = new EasyGroupScan(this);
     newScan.columns = columns;
@@ -302,4 +306,9 @@ public class EasyGroupScan extends AbstractFileGroupScan {
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return formatPlugin.supportsPushDown();
   }
+
+  @JsonProperty
+  public TupleMetadata getSchema() {
+    return schema;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index c51c7ac..e307660 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -24,6 +24,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
 
@@ -36,26 +38,27 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.Path;
 
 @JsonTypeName("fs-sub-scan")
-public class EasySubScan extends AbstractSubScan{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasySubScan.class);
+public class EasySubScan extends AbstractSubScan {
 
   private final List<FileWorkImpl> files;
   private final EasyFormatPlugin<?> formatPlugin;
   private final List<SchemaPath> columns;
   private final Path selectionRoot;
   private final int partitionDepth;
+  private final TupleMetadata schema;
 
   @JsonCreator
   public EasySubScan(
-      @JsonProperty("userName") String userName,
-      @JsonProperty("files") List<FileWorkImpl> files,
-      @JsonProperty("storage") StoragePluginConfig storageConfig,
-      @JsonProperty("format") FormatPluginConfig formatConfig,
-      @JacksonInject StoragePluginRegistry engineRegistry,
-      @JsonProperty("columns") List<SchemaPath> columns,
-      @JsonProperty("selectionRoot") Path selectionRoot,
-      @JsonProperty("partitionDepth") int partitionDepth
-      ) throws ExecutionSetupException {
+    @JsonProperty("userName") String userName,
+    @JsonProperty("files") List<FileWorkImpl> files,
+    @JsonProperty("storage") StoragePluginConfig storageConfig,
+    @JsonProperty("format") FormatPluginConfig formatConfig,
+    @JacksonInject StoragePluginRegistry engineRegistry,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("selectionRoot") Path selectionRoot,
+    @JsonProperty("partitionDepth") int partitionDepth,
+    @JsonProperty("schema") TupleSchema schema
+    ) throws ExecutionSetupException {
     super(userName);
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(this.formatPlugin);
@@ -63,16 +66,18 @@ public class EasySubScan extends AbstractSubScan{
     this.columns = columns;
     this.selectionRoot = selectionRoot;
     this.partitionDepth = partitionDepth;
+    this.schema = schema;
   }
 
   public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin,
-      List<SchemaPath> columns, Path selectionRoot, int partitionDepth) {
+      List<SchemaPath> columns, Path selectionRoot, int partitionDepth, TupleMetadata schema) {
     super(userName);
     this.formatPlugin = plugin;
     this.files = files;
     this.columns = columns;
     this.selectionRoot = selectionRoot;
     this.partitionDepth = partitionDepth;
+    this.schema = schema;
   }
 
   @JsonProperty
@@ -96,6 +101,9 @@ public class EasySubScan extends AbstractSubScan{
   @JsonProperty("columns")
   public List<SchemaPath> getColumns() { return columns; }
 
+  @JsonProperty("schema")
+  public TupleMetadata getSchema() { return schema; }
+
   @Override
   public int getOperatorType() { return formatPlugin.getReaderOperatorType(); }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index ec4bb12..701c5c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -63,7 +63,7 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma
   @Override
   public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
     throws IOException {
-    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, null);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 05ed1b5..c57576a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -42,6 +41,7 @@ import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.RecordReader;
@@ -189,8 +189,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     }
 
     @Override
-    protected ColumnsScanFramework buildFramework(
-        EasySubScan scan) throws ExecutionSetupException {
+    protected ColumnsScanFramework buildFramework(EasySubScan scan) {
       ColumnsScanFramework framework = new ColumnsScanFramework(
               scan.getColumns(),
               scan.getWorkUnits(),
@@ -244,19 +243,19 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, TupleMetadata schema)
       throws IOException {
-    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, schema);
   }
 
   @Override
   public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
-      List<SchemaPath> columns, OptionManager options) throws IOException {
+      List<SchemaPath> columns, OptionManager options, TupleMetadata schema) throws IOException {
     return new EasyGroupScan(userName, selection, this, columns,
         selection.selectionRoot,
         // Some paths provide a null option manager. In that case, default to a
         // min width of 1; just like the base class.
-        options == null ? 1 : (int) options.getLong(ExecConstants.MIN_READER_WIDTH_KEY));
+        options == null ? 1 : (int) options.getLong(ExecConstants.MIN_READER_WIDTH_KEY), schema);
   }
 
   @Override
@@ -311,9 +310,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public ManagedReader<ColumnsSchemaNegotiator> makeBatchReader(
-      DrillFileSystem dfs,
-      FileSplit split) throws ExecutionSetupException {
+  public ManagedReader<ColumnsSchemaNegotiator> makeBatchReader(DrillFileSystem dfs, FileSplit split) {
     TextParsingSettingsV3 settings = new TextParsingSettingsV3();
     settings.set(getConfig());
     return new CompliantTextBatchReader(split, dfs, settings);
@@ -324,12 +321,12 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
 
   @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 17a9506..aaff673 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -181,7 +181,7 @@ public class ParquetFormatPlugin implements FormatPlugin {
 
   @Override
   public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return getGroupScan(userName, selection, columns, null);
+    return getGroupScan(userName, selection, columns, (OptionManager) null);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 5443eea..83839c6 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -646,6 +646,7 @@ drill.exec.options: {
     # Using common operators batch configuration unless the Parquet specific
     # configuration is used
     store.parquet.flat.batch.memory_size: 0,
+    store.table.use_schema_file: false,
     store.partition.hash_distribute: false,
     store.text.estimated_row_size_bytes: 100.0,
     store.kafka.all_text_mode: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index b468503..d9e9bc0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -61,6 +61,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.mockito.Matchers;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -134,6 +135,7 @@ public class PlanningBase extends ExecTest {
         eq(TypeProtos.MinorType.VARDECIMAL),
         Matchers.<Function<DrillBuf, ValueHolder>>any()))
       .thenReturn(ValueHolderHelper.getVarDecimalHolder(allocator.buffer(4), "0.01"));
+    when(context.getOption(anyString())).thenCallRealMethod();
 
 
     for (final String sql : sqlStrings) {