You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/07/08 10:43:00 UTC

[drill] branch master updated: DRILL-7306: Disable schema-only batch for new scan framework

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e8dd9  DRILL-7306: Disable schema-only batch for new scan framework
66e8dd9 is described below

commit 66e8dd9f05ea068e27b272ed1b787004213c29e6
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Sun Jun 23 22:10:54 2019 -0700

    DRILL-7306: Disable schema-only batch for new scan framework
    
    The EVF framework is set up to return a "fast schema" empty batch
    with only schema as its first batch because, when the code was
    written, it seemed that's how we wanted operators to work. However,
    DRILL-7305 notes that many operators cannot handle empty batches.
    
    Since the empty-batch bugs show that Drill does not, in fact,
    provide a "fast schema" batch, this ticket asks to disable the
    feature in the new scan framework. The feature is disabled with
    a config option; it can be re-enabled if ever it is needed.
    
    SQL differentiates between two subtle cases, and both are
    supported by this change.
    
    1. Empty results: the query found a schema, but no rows
       are returned. If no reader returns any rows, but at
       least one reader provides a schema, then the scan
       returns an empty batch with the schema.
    2. Null results: the query found no schema or rows. No
       schema is returned. If no reader returns rows or
       schema, then the scan returns no batch: it instead
       immediately returns a DONE status.
    
    For CSV, an empty file with headers returns the null result set
    (because we don't know the schema.) An empty CSV file without headers
    returns an empty result set because we do know the schema: it will
    always be the columns array.
    
    Old tests validate the original schema-batch mode, new tests
    added to validate the no-schema-batch mode.
---
 .../exec/physical/impl/filter/FilterTemplate2.java |   6 +-
 .../physical/impl/protocol/OperatorDriver.java     |  57 +++++++++--
 .../impl/protocol/OperatorRecordBatch.java         |   5 +-
 .../exec/physical/impl/protocol/SchemaTracker.java |   6 +-
 .../impl/protocol/VectorContainerAccessor.java     |  30 ++++--
 .../drill/exec/physical/impl/scan/ReaderState.java |  64 ++++++++----
 .../exec/physical/impl/scan/ScanOperatorExec.java  | 109 +++++++++++++++++++--
 .../impl/scan/columns/ColumnsScanFramework.java    |   3 +-
 .../physical/impl/scan/file/FileScanFramework.java |   4 +-
 .../impl/scan/framework/ManagedScanFramework.java  |   5 +
 .../impl/scan/project/ScanSchemaOrchestrator.java  |  64 +++++++++++-
 .../physical/impl/scan/project/SchemaSmoother.java |   3 +-
 .../apache/drill/exec/record/VectorContainer.java  |  36 ++++---
 .../exec/store/dfs/easy/EasyFormatPlugin.java      |  76 +++++++-------
 .../exec/store/easy/text/TextFormatPlugin.java     |   9 +-
 .../easy/text/reader/CompliantTextBatchReader.java |   2 +-
 .../apache/drill/TestSchemaWithTableFunction.java  |  11 +--
 .../org/apache/drill/exec/TestEmptyInputSql.java   |   6 +-
 .../impl/protocol/TestOperatorRecordBatch.java     |  24 ++---
 .../impl/scan/BaseScanOperatorExecTest.java        |  30 +++---
 .../exec/physical/impl/scan/ScanTestUtils.java     |  27 +++--
 .../exec/physical/impl/scan/TestColumnsArray.java  |   6 +-
 .../impl/scan/TestColumnsArrayFramework.java       |   6 +-
 .../physical/impl/scan/TestFileScanFramework.java  |   6 +-
 .../physical/impl/scan/TestScanOperExecBasics.java |  11 ++-
 .../impl/scan/TestScanOperExecEarlySchema.java     |  52 +++++++++-
 .../impl/scan/TestScanOperExecLateSchema.java      |  44 +++++++++
 .../impl/scan/TestScanOperExecSmoothing.java       |  21 ++++
 .../impl/scan/TestScanOrchestratorEarlySchema.java |  27 ++---
 .../impl/scan/TestScanOrchestratorLateSchema.java  |   5 +-
 .../impl/scan/TestScanOrchestratorMetadata.java    |  11 ++-
 .../impl/scan/project/TestSchemaSmoothing.java     |   3 +-
 .../store/easy/text/compliant/BaseCsvTest.java     |  11 +++
 .../store/easy/text/compliant/TestCsvHeader.java   |  10 +-
 .../easy/text/compliant/TestCsvWithHeaders.java    |  66 ++++++++-----
 .../easy/text/compliant/TestCsvWithSchema.java     |   2 +-
 .../easy/text/compliant/TestCsvWithoutHeaders.java |  43 ++++++--
 .../easy/text/compliant/TestPartitionRace.java     |  35 ++++---
 .../apache/drill/exec/store/log/TestLogReader.java |   4 -
 .../java/org/apache/drill/test/QueryBuilder.java   |  35 +++++--
 .../apache/drill/test/rowSet/RowSetBuilder.java    |  10 +-
 .../apache/drill/test/rowSet/RowSetUtilities.java  |   2 +-
 .../drill/exec/record/MaterializedField.java       |   4 +-
 .../drill/exec/record/metadata/ColumnBuilder.java  |  17 +++-
 .../record/metadata/PrimitiveColumnMetadata.java   |  18 +++-
 45 files changed, 764 insertions(+), 262 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 483a9ed..d09e23e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.physical.impl.filter;
 
 import javax.inject.Named;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
@@ -102,8 +102,8 @@ public abstract class FilterTemplate2 implements Filterer {
 
   private void filterBatchNoSV(int recordCount) throws SchemaChangeException {
     int svIndex = 0;
-    for(int i = 0; i < recordCount; i++){
-      if(doEval(i, 0)){
+    for (int i = 0; i < recordCount; i++) {
+      if (doEval(i, 0)) {
         outgoingSelectionVector.setIndex(svIndex, (char)i);
         svIndex++;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
index 9e6190c..03e6d4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
@@ -26,6 +26,19 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
  * between the iterator protocol and the operator executable protocol.
  * Implemented as a separate class in anticipation of eventually
  * changing the record batch (iterator) protocol.
+ *
+ * <h4>Schema-Only Batch</h4>
+ *
+ * The scan operator is designed to provide an initial, empty, schema-only
+ * batch. At the time that this code was written, it was (mis-?) understood
+ * that Drill used a "fast schema" path that provided a schema-only batch
+ * as the first batch. However, it turns out that most operators fail when
+ * presented with an empty batch: many do not properly set the offset
+ * vector for variable-width vectors to an initial 0 position, causing all
+ * kinds of issues.
+ * <p>
+ * To work around this issue, the code defaults to *not* providing the
+ * schema batch.
  */
 
 public class OperatorDriver {
@@ -38,16 +51,13 @@ public class OperatorDriver {
     START,
 
     /**
-     * The first call to next() has been made and schema (only)
-     * was returned. On the subsequent call to next(), return any
-     * data that might have accompanied that first batch.
+     * Attempting to start the operator.
      */
 
-    SCHEMA,
+    STARTING,
 
     /**
-     * The second call to next() has been made and there is more
-     * data to deliver on subsequent calls.
+     * Read from readers.
      */
 
     RUN,
@@ -93,11 +103,13 @@ public class OperatorDriver {
   private final OperatorExec operatorExec;
   private final BatchAccessor batchAccessor;
   private int schemaVersion;
+  private final boolean enableSchemaBatch;
 
-  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+  public OperatorDriver(OperatorContext opContext, OperatorExec opExec, boolean enableSchemaBatch) {
     this.opContext = opContext;
     this.operatorExec = opExec;
     batchAccessor = operatorExec.batchAccessor();
+    this.enableSchemaBatch = enableSchemaBatch;
   }
 
   /**
@@ -156,9 +168,17 @@ public class OperatorDriver {
    */
 
   private IterOutcome start() {
-    state = State.SCHEMA;
+    state = State.STARTING;
+    schemaVersion = -1;
+    if (!enableSchemaBatch) {
+      return doNext();
+    }
     if (operatorExec.buildSchema()) {
       schemaVersion = batchAccessor.schemaVersion();
+
+      // Report schema change.
+
+      batchAccessor.getOutgoingContainer().schemaChanged();
       state = State.RUN;
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
@@ -178,10 +198,23 @@ public class OperatorDriver {
       return IterOutcome.NONE;
     }
     int newVersion = batchAccessor.schemaVersion();
-    if (newVersion != schemaVersion) {
+    boolean schemaChanged = newVersion != schemaVersion;
+
+    // Set the container schema changed based on whether the
+    // current schema differs from that the last time through
+    // this method. That is, we take "schema changed" to be
+    // "schema changed since last call to next." The result hide
+    // trivial changes within this operator.
+
+    if (schemaChanged) {
+      batchAccessor.getOutgoingContainer().schemaChanged();
+    }
+    if (state == State.STARTING || schemaChanged) {
       schemaVersion = newVersion;
+      state = State.RUN;
       return IterOutcome.OK_NEW_SCHEMA;
     }
+    state = State.RUN;
     return IterOutcome.OK;
   }
 
@@ -189,11 +222,15 @@ public class OperatorDriver {
    * Implement a cancellation, and ignore any exception that is
    * thrown. We're already in trouble here, no need to keep track
    * of additional things that go wrong.
+   * <p>
+   * Cancellation is done only if the operator is doing work.
+   * The request is not propagated if either the operator never
+   * started, or is already finished.
    */
 
   private void cancelSilently() {
     try {
-      if (state == State.SCHEMA || state == State.RUN) {
+      if (state == State.STARTING || state == State.RUN) {
         operatorExec.cancel();
       }
     } catch (Throwable t) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index e0beab1..639a25a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -57,7 +57,8 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   private final BatchAccessor batchAccessor;
   private IterOutcome lastOutcome;
 
-  public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
+  public OperatorRecordBatch(FragmentContext context, PhysicalOperator config,
+      OperatorExec opExec, boolean enableSchemaBatch) {
     OperatorContext opContext = context.newOperatorContext(config);
     opContext.getStats().startProcessing();
 
@@ -66,7 +67,7 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
 
     try {
       opExec.bind(opContext);
-      driver = new OperatorDriver(opContext, opExec);
+      driver = new OperatorDriver(opContext, opExec, enableSchemaBatch);
       batchAccessor = opExec.batchAccessor();
     } catch (UserException e) {
       opContext.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
index cd7c296..0494498 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
@@ -51,6 +51,9 @@ import org.apache.drill.exec.vector.ValueVector;
  * vector is just as serious as a change in schema. Hence, operators
  * try to use the same vectors for their entire lives. That is the change
  * tracked here.
+ * <p>
+ * Schema versions start at 1. A schema version of 0 means that no
+ * output batch was ever presented.
  */
 
 // TODO: Does not handle SV4 situations
@@ -62,7 +65,6 @@ public class SchemaTracker {
   private List<ValueVector> currentVectors = new ArrayList<>();
 
   public void trackSchema(VectorContainer newBatch) {
-
     if (! isSameSchema(newBatch)) {
       schemaVersion++;
       captureSchema(newBatch);
@@ -95,4 +97,4 @@ public class SchemaTracker {
 
   public int schemaVersion() { return schemaVersion; }
   public BatchSchema schema() { return currentSchema; }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
index e2d78d7..e97c374 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
@@ -57,10 +57,26 @@ public class VectorContainerAccessor implements BatchAccessor {
 
   private VectorContainer container;
   private SchemaTracker schemaTracker = new SchemaTracker();
+  private int batchCount;
 
   /**
-   * Set the vector container. Done initially, and any time the schema of
-   * the container may have changed. May be called with the same container
+   * Define a schema that does not necessarily contain any data.
+   * Call this to declare a schema when there are no results to
+   * report.
+   */
+
+  public void setSchema(VectorContainer container) {
+    this.container = container;
+    if (container != null) {
+      schemaTracker.trackSchema(container);
+    }
+  }
+
+  /**
+   * Define an output batch. Called each time a new batch is sent
+   * downstream. Checks if the schema of this batch is the same as
+   * that of any previous batch, and updates the schema version if
+   * the schema changes. May be called with the same container
    * as the previous call, or a different one. A schema change occurs
    * unless the vectors are identical across the two containers.
    *
@@ -68,13 +84,13 @@ public class VectorContainerAccessor implements BatchAccessor {
    * downstream
    */
 
-  public void setContainer(VectorContainer container) {
-    this.container = container;
-    if (container != null) {
-      schemaTracker.trackSchema(container);
-    }
+  public void addBatch(VectorContainer container) {
+    setSchema(container);
+    batchCount++;
   }
 
+  public int batchCount() { return batchCount; }
+
   @Override
   public BatchSchema getSchema() {
     return container == null ? null : container.getSchema();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index 600a9d4..496f326 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -131,10 +131,13 @@ class ReaderState {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderState.class);
 
   private enum State {
+
     /**
      * Initial state before opening the reader.
      */
+
     START,
+
     /**
      * The scan operator is obligated to provide a "fast schema", without data,
      * before the first row of data. "Early schema" readers (those that provide
@@ -152,7 +155,9 @@ class ReaderState {
      * the next call to {@link ReaderState#next()} will return this look-ahead
      * batch rather than reading a new one.
      */
+
     LOOK_AHEAD,
+
     /**
      * As above, but the reader hit EOF during the read of the look-ahead batch.
      * The {@link ReaderState#next()} checks if the lookahead batch has any
@@ -166,26 +171,33 @@ class ReaderState {
      * row in the result set loader. That look-ahead is handled by the
      * (shim) reader which this class manages.
      */
+
     LOOK_AHEAD_WITH_EOF,
+
     /**
      * Normal state: the reader has supplied data but not yet reported EOF.
      */
+
     ACTIVE,
+
     /**
      * The reader has reported EOF. No look-ahead batch is active. The
      * reader's next() method will no longer be called.
      */
+
     EOF,
+
     /**
      * The reader is closed: no further operations are allowed.
      */
-    CLOSED };
+
+    CLOSED
+  };
 
   final ScanOperatorExec scanOp;
   private final RowBatchReader reader;
   private State state = State.START;
   private VectorContainer lookahead;
-  private int schemaVersion = -1;
 
   public ReaderState(ScanOperatorExec scanOp, RowBatchReader reader) {
     this.scanOp = scanOp;
@@ -302,8 +314,7 @@ class ReaderState {
       // Bind the output container to the output of the scan operator.
       // This returns an empty batch with the schema filled in.
 
-      scanOp.containerAccessor.setContainer(reader.output());
-      schemaVersion = reader.schemaVersion();
+      scanOp.containerAccessor.setSchema(reader.output());
       return true;
     }
 
@@ -313,13 +324,13 @@ class ReaderState {
       return false;
     }
     VectorContainer container = reader.output();
-    schemaVersion = reader.schemaVersion();
     if (container.getRecordCount() == 0) {
       return true;
     }
 
     // The reader returned actual data. Just forward the schema
-    // in a dummy container, saving the data for next time.
+    // in the operator's container, saving the data for next time
+    // in a dummy container.
 
     assert lookahead == null;
     lookahead = new VectorContainer(scanOp.context.getAllocator(), scanOp.containerAccessor.getSchema());
@@ -347,7 +358,11 @@ class ReaderState {
       lookahead.exchange(scanOp.containerAccessor.getOutgoingContainer());
       assert lookahead.getRecordCount() == 0;
       lookahead = null;
-      state = state == State.LOOK_AHEAD_WITH_EOF ? State.EOF : State.ACTIVE;
+      if (state == State.LOOK_AHEAD_WITH_EOF) {
+        state = State.EOF;
+      } else {
+        state = State.ACTIVE;
+      }
       return true;
 
     case ACTIVE:
@@ -395,9 +410,6 @@ class ReaderState {
     boolean more;
     try {
       more = reader.next();
-      if (! more) {
-        state = State.EOF;
-      }
     } catch (UserException e) {
       throw e;
     } catch (InvalidConversionError e) {
@@ -417,8 +429,30 @@ class ReaderState {
     }
 
     VectorContainer output = reader.output();
-    if (! more && output.getRecordCount() == 0) {
-      return false;
+    if (! more) {
+      state = State.EOF;
+
+      // The reader can indicate EOF (they can't return any more rows)
+      // while returning a non-empty final batch. This is the typical
+      // case with files: the reader read some records and then hit
+      // EOF. Avoids the need for the reader to keep an EOF state.
+
+      if (output.getRecordCount() == 0) {
+
+        // No results, possibly from the first batch.
+        // If the scan has no schema, but this (possibly empty) reader
+        // does have a schema, then pass along this empty batch
+        // as a candidate empty result set of the entire scan.
+
+        if (scanOp.containerAccessor.schemaVersion() == 0 &&
+            reader.schemaVersion() > 0) {
+          scanOp.containerAccessor.setSchema(output);
+        }
+        return false;
+      }
+
+      // EOF (the reader can provide no more batches), but
+      // the reader did provide rows in this batch. Fall through.
     }
 
     // Late schema readers may change their schema between batches.
@@ -427,11 +461,7 @@ class ReaderState {
     // a reader that starts with a schema, but later changes it, has
     // morphed from an early- to late-schema reader.)
 
-    int newVersion = reader.schemaVersion();
-    if (newVersion > schemaVersion) {
-      scanOp.containerAccessor.setContainer(output);
-      schemaVersion = newVersion;
-    }
+    scanOp.containerAccessor.addBatch(output);
     return true;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index ff90bfe..701c82b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -20,10 +20,10 @@ package org.apache.drill.exec.physical.impl.scan;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
 import org.apache.drill.exec.physical.impl.protocol.OperatorExec;
 import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
-
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -67,6 +67,7 @@ import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTes
  * {#link ResultSetLoader} to write values into value vectors.
  *
  * <h4>Schema Versions</h4>
+ *
  * Readers may change schemas from time to time. To track such changes,
  * this implementation tracks a batch schema version, maintained by comparing
  * one schema with the next.
@@ -80,23 +81,94 @@ import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTes
  * each increasing its internal version number as work proceeds. But, at the
  * end of each batch, the schemas may (and, in fact, should) be identical,
  * which is the schema version downstream operators care about.
+ *
+ * <h4>Empty Files and/or Empty Schemas</h4>
+ *
+ * A corner case occurs if the input is empty, such as a CSV file
+ * that contains no data. The general rule is the following:
+ *
+ * <ul>
+ * <li>If the reader is "early schema" (the schema is defined at
+ * open time), then the result will be a single empty batch with
+ * the schema defined. Example: a CSV file without headers; in this case,
+ * we know the schema is always the single `columns` array.</li>
+ * <li>If the reader is "late schema" (the schema is defined while the
+ * data is read), then no batch is returned because there is no schema.
+ * Example: a JSON file. It is not helpful to return a single batch
+ * with no columns; such a batch will simply conflict with some other
+ * non-empty-schema batch. It turns out that other DBs handle this
+ * case gracefully: a query of the form<br><pre><tt>
+ * SELECT * FROM VALUES()</tt></pre><br>
+ * Will produce an empty result: no schema, no data.</li>
+ * <li>The hybrid case: the reader could provide an early schema,
+ * but cannot do so. That is, the early schema contains no columns.
+ * We treat this case identically to the late schema case. Example: a
+ * CSV file with headers in which the header line is empty.</li>
+ * </ul>
  */
 
 public class ScanOperatorExec implements OperatorExec {
 
-  private enum State { START, READER, END, FAILED, CLOSED }
+  private enum State {
+
+    /**
+     * The scan has been started, but next() has not yet been
+     * called.
+     */
+
+    START,
+
+    /**
+     * A reader is active and has more batches to deliver.
+     */
+
+    READER,
+
+    /**
+     * All readers are completed, non returned any data, but
+     * the final reader did provide a schema. An empty batch
+     * was returned from next(). The next call to next() will
+     * be the last.
+     */
+
+    EMPTY,
+
+    /**
+     * All readers are complete; no more batches to deliver.
+     * close() is not yet called.
+     */
+
+    END,
+
+    /**
+     * A fatal error occurred during the START or READER
+     * states. No further calls to next() allowed. Waiting
+     * for the call to close().
+     */
+
+    FAILED,
+
+    /**
+     * Scan operator is closed. All resources and state are
+     * released. No further calls of any kind are allowed.
+     */
+    CLOSED
+  }
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanOperatorExec.class);
 
   private final ScanOperatorEvents factory;
+  private final boolean allowEmptyResult;
   protected final VectorContainerAccessor containerAccessor = new VectorContainerAccessor();
   private State state = State.START;
   protected OperatorContext context;
   private int readerCount;
   private ReaderState readerState;
 
-  public ScanOperatorExec(ScanOperatorEvents factory) {
+  public ScanOperatorExec(ScanOperatorEvents factory,
+      boolean allowEmptyResult) {
     this.factory = factory;
+    this.allowEmptyResult = allowEmptyResult;
   }
 
   @Override
@@ -151,6 +223,7 @@ public class ScanOperatorExec implements OperatorExec {
       switch (state) {
 
       case READER:
+      case START: // Occurs if no schema batch
         // Read another batch from the list of row readers. Keeps opening,
         // reading from, and closing readers as needed to locate a batch, or
         // until all readers are exhausted. Terminates when a batch is read,
@@ -159,6 +232,10 @@ public class ScanOperatorExec implements OperatorExec {
         nextAction(false);
         return state != State.END;
 
+      case EMPTY:
+        state = State.END;
+        return false;
+
       case END:
         return false;
 
@@ -171,14 +248,14 @@ public class ScanOperatorExec implements OperatorExec {
     }
   }
 
-  private void nextAction(boolean schema) {
+  private void nextAction(boolean readSchema) {
     for (;;) {
 
       // If have a reader, read a batch
 
       if (readerState != null) {
         boolean hasData;
-        if (schema) {
+        if (readSchema) {
           hasData = readerState.buildSchema();
         } else {
           hasData = readerState.next();
@@ -192,7 +269,7 @@ public class ScanOperatorExec implements OperatorExec {
       // Another reader available?
 
       if (! nextReader()) {
-        state = State.END;
+        finalizeResults();
         return;
       }
       state = State.READER;
@@ -206,6 +283,25 @@ public class ScanOperatorExec implements OperatorExec {
   }
 
   /**
+   * The last reader is done. Check for the special case that no reader
+   * returned any rows, but some reader provided a schema. In this case,
+   * we can return an empty result set with a schema. Otherwise, we have
+   * to return a null result set: no schema, no data. For the Volcano
+   * iterator protocol, this means no return of OK_NEW_SCHEMA, just
+   * an immediate return of NONE.
+   */
+
+  private void finalizeResults() {
+    if (allowEmptyResult &&
+        containerAccessor.batchCount() == 0 &&
+        containerAccessor.schemaVersion() > 0) {
+      state = State.EMPTY;
+    } else {
+      state = State.END;
+    }
+  }
+
+  /**
    * Open the next available reader, if any, preparing both the
    * reader and row set mutator.
    * @return true if another reader is active, false if no more
@@ -218,7 +314,6 @@ public class ScanOperatorExec implements OperatorExec {
 
     final RowBatchReader reader = factory.nextReader();
     if (reader == null) {
-      containerAccessor.setContainer(null);
       return false;
     }
     readerCount++;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
index 4dc9467..6d22dd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.scan.columns;
 
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiatorImpl;
 
@@ -43,7 +44,7 @@ public class ColumnsScanFramework extends FileScanFramework {
     }
 
     @Override
-    public FileScanFramework buildFileFramework() {
+    public ScanOperatorEvents buildEvents() {
       return new ColumnsScanFramework(this);
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index 761f68b..8c6cd22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserException.Builder;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
@@ -156,7 +157,8 @@ public class FileScanFramework extends ManagedScanFramework {
 
     public FileMetadataOptions metadataOptions() { return metadataOptions; }
 
-    public FileScanFramework buildFileFramework() {
+    @Override
+    public ScanOperatorEvents buildEvents() {
       return new FileScanFramework(this);
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
index a7a46fe..712d21c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
@@ -146,6 +146,11 @@ public class ManagedScanFramework implements ScanOperatorEvents {
     public void setUserName(String userName) {
       this.userName = userName;
     }
+
+    @Override
+    public ScanOperatorEvents buildEvents() {
+      return new ManagedScanFramework(this);
+    }
   }
 
   // Inputs
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index 12469ec..8698bff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -24,6 +24,12 @@ import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.protocol.OperatorDriver;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
 import org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
 import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter;
@@ -31,6 +37,7 @@ import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
 /**
  * Performs projection of a record reader, along with a set of static
@@ -152,7 +159,7 @@ public class ScanSchemaOrchestrator {
   public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE;
   public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
 
-  public static class ScanOrchestratorBuilder {
+  public abstract static class ScanOrchestratorBuilder {
 
     private MajorType nullType;
     private MetadataManager metadataManager;
@@ -166,8 +173,43 @@ public class ScanSchemaOrchestrator {
     private TypeConverter.Builder typeConverterBuilder = TypeConverter.builder();
 
     /**
+     * Option that enables whether the scan operator starts with an empty
+     * schema-only batch (the so-called "fast schema" that Drill once tried
+     * to provide) or starts with a non-empty data batch (which appears to
+     * be the standard since the "Empty Batches" project some time back.)
+     * See more details in {@link OperatorDriver} Javadoc.
+     * <p>
+     * Defaults to <tt>false</tt>, meaning to <i>not</i> provide the empty
+     * schema batch. DRILL-7305 explains that many operators fail when
+     * presented with an empty batch, so do not enable this feature until
+     * those issues are fixed. Of course, do enable the feature if you want
+     * to track down the DRILL-7305 bugs.
+     */
+
+    private boolean enableSchemaBatch;
+
+    /**
+     * Option to disable empty results. An empty result occurs if no
+     * reader has any data, but at least one reader can provide a schema.
+     * In this case, the scan can return a single, empty batch, with
+     * an associated schema. This is the correct SQL result for an
+     * empty query. However, if this result triggers empty-batch bugs
+     * in other operators, we can, instead, disable this feature and
+     * return a null result set: no schema, no batch, just a "fast NONE",
+     * an immediate return of NONE from the Volcano iterator.
+     * <p>
+     * Disabling this option is not desirable: it means that the user
+     * gets no schema for queries that should be able to return one. So,
+     * disable this option only if we cannot find or fix empty-batch
+     * bugs.
+     */
+
+    public boolean disableEmptyResults;
+
+    /**
      * Context for error messages.
      */
+
     private CustomErrorContext errorContext;
 
     /**
@@ -242,6 +284,14 @@ public class ScanSchemaOrchestrator {
       this.projection = projection;
     }
 
+    public void enableSchemaBatch(boolean option) {
+      enableSchemaBatch = option;
+    }
+
+    public void disableEmptyResults(boolean option) {
+      disableEmptyResults = option;
+    }
+
     public TypeConverter.Builder typeConverterBuilder() {
       return typeConverterBuilder;
     }
@@ -253,6 +303,18 @@ public class ScanSchemaOrchestrator {
     public CustomErrorContext errorContext() {
       return errorContext;
     }
+
+    @VisibleForTesting
+    public ScanOperatorExec buildScan() {
+      return new ScanOperatorExec(buildEvents(),
+          ! disableEmptyResults);
+    }
+
+    public OperatorRecordBatch buildScanOperator(FragmentContext fragContext, PhysicalOperator pop) {
+      return new OperatorRecordBatch(fragContext, pop, buildScan(), enableSchemaBatch);
+    }
+
+    public abstract ScanOperatorEvents buildEvents();
   }
 
   public static class ScanSchemaOptions {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java
index 6582f88..77df2c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java
@@ -87,8 +87,7 @@ public class SchemaSmoother {
   }
 
   public ReaderLevelProjection resolve(
-      TupleMetadata tableSchema,
-      ResolvedTuple outputTuple) {
+      TupleMetadata tableSchema, ResolvedTuple outputTuple) {
 
     // If a prior schema exists, try resolving the new table using the
     // prior schema. If this works, use the projection. Else, start
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 4a90184..d7f77bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -41,11 +41,11 @@ public class VectorContainer implements VectorAccessible {
   private final BufferAllocator allocator;
   protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
   private BatchSchema schema;
-
-  private int recordCount = 0;
-  private boolean initialized = false;
+  private int recordCount;
+  private boolean initialized;
   // private BufferAllocator allocator;
-  private boolean schemaChanged = true; // Schema has changed since last built. Must rebuild schema
+  // Schema has changed since last built. Must rebuild schema
+  private boolean schemaChanged = true;
 
   public VectorContainer() {
     allocator = null;
@@ -93,8 +93,18 @@ public class VectorContainer implements VectorAccessible {
 
   public BufferAllocator getAllocator() { return allocator; }
 
-  public boolean isSchemaChanged() {
-    return schemaChanged;
+  public boolean isSchemaChanged() { return schemaChanged; }
+
+  /**
+   * Indicate the schema changed. Normally set by mutating this container.
+   * If schemas are built externally, call this if the schema contained
+   * here is different than the one provided in a previous batch. (Some
+   * operators don't trust OK_NEW_SCHEMA, and use the schema changed
+   * flag for the "real" truth.
+   */
+
+  public void schemaChanged() {
+    schemaChanged = true;
   }
 
   public void addHyperList(List<ValueVector> vectors) {
@@ -237,7 +247,7 @@ public class VectorContainer implements VectorAccessible {
     }
 
   public TypedFieldId add(ValueVector vv) {
-    schemaChanged = true;
+    schemaChanged();
     schema = null;
     int i = wrappers.size();
     wrappers.add(SimpleVectorWrapper.create(vv));
@@ -255,7 +265,7 @@ public class VectorContainer implements VectorAccessible {
 
   public void add(ValueVector[] hyperVector, boolean releasable) {
     assert hyperVector.length != 0;
-    schemaChanged = true;
+    schemaChanged();
     schema = null;
     Class<?> clazz = hyperVector[0].getClass();
     ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, hyperVector.length);
@@ -266,7 +276,7 @@ public class VectorContainer implements VectorAccessible {
 
   public void remove(ValueVector v) {
     schema = null;
-    schemaChanged = true;
+    schemaChanged();
     for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
       VectorWrapper<?> w = iter.next();
       if (!w.isHyper() && v == w.getValueVector()) {
@@ -280,7 +290,7 @@ public class VectorContainer implements VectorAccessible {
 
   private void replace(ValueVector old, ValueVector newVector) {
     schema = null;
-    schemaChanged = true;
+    schemaChanged();
     int i = 0;
     for (VectorWrapper<?> w : wrappers){
       if (!w.isHyper() && old == w.getValueVector()) {
@@ -355,8 +365,8 @@ public class VectorContainer implements VectorAccessible {
     for (VectorWrapper<?> v : wrappers) {
       bldr.addField(v.getField());
     }
-    this.schema = bldr.build();
-    this.schemaChanged = false;
+    schema = bldr.build();
+    schemaChanged = false;
   }
 
   @Override
@@ -500,7 +510,7 @@ public class VectorContainer implements VectorAccessible {
     String separator = "";
     sb.append("[");
 
-    for (VectorWrapper vectorWrapper: wrappers) {
+    for (VectorWrapper<?> vectorWrapper: wrappers) {
       sb.append(separator);
       separator = ", ";
       final String columnName = vectorWrapper.getField().getName();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index ba47f40..e15519c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -39,9 +39,6 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
-import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
@@ -348,47 +345,15 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
   private CloseableRecordBatch buildScan(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
 
-    // Assemble the scan operator and its wrapper.
-
     try {
       final FileScanBuilder builder = frameworkBuilder(context.getOptions(), scan);
-      builder.setProjection(scan.getColumns());
-      builder.setFiles(scan.getWorkUnits());
-      builder.setConfig(easyConfig().fsConf);
-      builder.setUserName(scan.getUserName());
-
-      // Pass along the output schema, if any
-
-      builder.typeConverterBuilder().providedSchema(scan.getSchema());
-      final Path selectionRoot = scan.getSelectionRoot();
-      if (selectionRoot != null) {
-        builder.metadataOptions().setSelectionRoot(selectionRoot);
-        builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth());
-      }
 
       // Add batch reader, if none specified
 
       if (builder.readerFactory() == null) {
         builder.setReaderFactory(new EasyReaderFactory(this, scan, context));
       }
-
-      // Add error context, if none is specified
-
-      if (builder.errorContext() == null) {
-        builder.setContext(
-            new CustomErrorContext() {
-              @Override
-              public void addContext(UserException.Builder builder) {
-                builder.addContext("Format plugin:",
-                    EasyFormatPlugin.this.getClass().getSimpleName());
-                builder.addContext("Plugin config name:", getName());
-              }
-            });
-      }
-
-      FileScanFramework framework = builder.buildFileFramework();
-      return new OperatorRecordBatch(context, scan,
-          new ScanOperatorExec(framework));
+      return builder.buildScanOperator(context, scan);
     } catch (final UserException e) {
       // Rethrow user exceptions directly
       throw e;
@@ -398,6 +363,45 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     }
   }
 
+  /**
+   * Initialize the scan framework builder with standard options.
+   * Call this from the plugin-specific
+   * {@link #frameworkBuilder(OptionManager, EasySubScan)} method.
+   * The plugin can then customize/revise options as needed.
+   *
+   * @param builder the scan framework builder you create in the
+   * {@link #frameworkBuilder()} method
+   * @param scan the physical scan operator definition passed to
+   * the {@link #frameworkBuilder()} method
+   */
+
+  protected void initScanBuilder(FileScanBuilder builder, EasySubScan scan) {
+    builder.setProjection(scan.getColumns());
+    builder.setFiles(scan.getWorkUnits());
+    builder.setConfig(easyConfig().fsConf);
+    builder.setUserName(scan.getUserName());
+
+    // Pass along the output schema, if any
+
+    builder.typeConverterBuilder().providedSchema(scan.getSchema());
+    final Path selectionRoot = scan.getSelectionRoot();
+    if (selectionRoot != null) {
+      builder.metadataOptions().setSelectionRoot(selectionRoot);
+      builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth());
+    }
+
+    builder.setContext(
+        new CustomErrorContext() {
+          @Override
+          public void addContext(UserException.Builder builder) {
+            builder.addContext("Format plugin:", easyConfig.defaultName);
+            builder.addContext("Format plugin:",
+                EasyFormatPlugin.this.getClass().getSimpleName());
+            builder.addContext("Plugin config name:", getName());
+          }
+        });
+  }
+
   public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
       EasySubScan scan, OptionManager options) throws ExecutionSetupException {
     throw new ExecutionSetupException("Must implement newBatchReader() if using the enhanced framework.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 400ea19..243d58f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -264,6 +264,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   protected FileScanBuilder frameworkBuilder(
       OptionManager options, EasySubScan scan) throws ExecutionSetupException {
     ColumnsScanBuilder builder = new ColumnsScanBuilder();
+    initScanBuilder(builder, scan);
     TextParsingSettings settings =
         new TextParsingSettings(getConfig(), scan, options);
     builder.setReaderFactory(new ColumnsReaderFactory(settings));
@@ -293,12 +294,12 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     builder.allowRequiredNullColumns(true);
 
     // Provide custom error context
+
     builder.setContext(
-        new CustomErrorContext() {
+        new ChildErrorContext(builder.errorContext()) {
           @Override
           public void addContext(UserException.Builder builder) {
-            builder.addContext("Format plugin:", PLUGIN_NAME);
-            builder.addContext("Plugin config name:", getName());
+            super.addContext(builder);
             builder.addContext("Extract headers:",
                 Boolean.toString(getConfig().isHeaderExtractionEnabled()));
             builder.addContext("Skip first line:",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 877d963..3b7ffb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -230,7 +230,7 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
       // Return false on the batch that hits EOF. The scan operator
       // knows to process any rows in this final batch.
 
-      return more && writer.rowCount() > 0;
+      return more;
     } catch (IOException | TextParsingException e) {
       if (e.getCause() != null  && e.getCause() instanceof UserException) {
         throw (UserException) e.getCause();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
index 6f89b9d..f40a84e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -51,12 +50,6 @@ public class TestSchemaWithTableFunction extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get(DATA_PATH));
     ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
     startCluster(builder);
-    client.alterSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY, true);
-  }
-
-  @AfterClass
-  public static void cleanUp() {
-    client.resetSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY);
   }
 
   @Test
@@ -72,7 +65,7 @@ public class TestSchemaWithTableFunction extends ClusterTest {
       .go();
 
     String plan = queryBuilder().sql(query, table).explainText();
-    assertTrue(plan.contains("schema=[TupleSchema [PrimitiveColumnMetadata [`Year` (INT(0, 0):OPTIONAL)]]]"));
+    assertTrue(plan.contains("schema=[TupleSchema [PrimitiveColumnMetadata [`Year` (INT:OPTIONAL)]]]"));
   }
 
   @Test
@@ -160,7 +153,7 @@ public class TestSchemaWithTableFunction extends ClusterTest {
         .go();
 
       String plan = queryBuilder().sql(query, table).explainText();
-      assertTrue(plan.contains("schema=[TupleSchema [PrimitiveColumnMetadata [`id` (INT(0, 0):OPTIONAL)]]]"));
+      assertTrue(plan.contains("schema=[TupleSchema [PrimitiveColumnMetadata [`id` (INT:OPTIONAL)]]]"));
     } finally {
       client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
       run("drop table if exists %s", table);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
index 8460152..58e2e80 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
@@ -49,8 +49,9 @@ public class TestEmptyInputSql extends BaseTestQuery {
   }
 
   /**
-   * Test with query against an empty file. Select clause has regular column reference, and an expression.
-   *
+   * Test with query against an empty file. Select clause has regular column
+   * reference, and an expression.
+   * <p>
    * regular column "key" is assigned with nullable-int
    * expression "key + 100" is materialized with nullable-int as output type.
    */
@@ -112,7 +113,6 @@ public class TestEmptyInputSql extends BaseTestQuery {
         .schemaBaseLine(expectedSchema)
         .build()
         .run();
-
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
index d9d27c4..ed18a2b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
@@ -88,7 +88,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
 
     public MockOperatorExec(VectorContainer container) {
       batchAccessor = new VectorContainerAccessor();
-      batchAccessor.setContainer(container);
+      batchAccessor.addBatch(container);
     }
 
     public MockOperatorExec(VectorContainerAccessor accessor) {
@@ -120,7 +120,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
         newSchema.schemaBuilder()
             .add("b", MinorType.VARCHAR);
         VectorContainer newContainer = new VectorContainer(fixture.allocator(), newSchema.build());
-        batchAccessor.setContainer(newContainer);
+        batchAccessor.addBatch(newContainer);
       }
       return true;
     }
@@ -148,7 +148,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
   private OperatorRecordBatch makeOpBatch(MockOperatorExec opExec) {
     // Dummy operator definition
     PhysicalOperator popConfig = new Limit(null, 0, 100);
-    return new OperatorRecordBatch(fixture.getFragmentContext(), popConfig, opExec);
+    return new OperatorRecordBatch(fixture.getFragmentContext(), popConfig, opExec, true);
   }
 
   /**
@@ -444,7 +444,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
     // Changing data does not trigger schema change
 
     container.zeroVectors();
-    opExec.batchAccessor.setContainer(container);
+    opExec.batchAccessor.addBatch(container);
     assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
 
     // Different container, same vectors, does not trigger a change
@@ -454,10 +454,10 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
       c2.add(vw.getValueVector());
     }
     c2.buildSchema(SelectionVectorMode.NONE);
-    opExec.batchAccessor.setContainer(c2);
+    opExec.batchAccessor.addBatch(c2);
     assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
 
-    opExec.batchAccessor.setContainer(container);
+    opExec.batchAccessor.addBatch(container);
     assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
 
     // Replacing a vector with another of the same type does trigger
@@ -469,13 +469,13 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
             container.getValueVector(1).getValueVector().getField(),
             fixture.allocator(), null));
     c3.buildSchema(SelectionVectorMode.NONE);
-    opExec.batchAccessor.setContainer(c3);
+    opExec.batchAccessor.addBatch(c3);
     assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
     schemaVersion = opExec.batchAccessor().schemaVersion();
 
     // No change if same schema again
 
-    opExec.batchAccessor.setContainer(c3);
+    opExec.batchAccessor.addBatch(c3);
     assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
 
     // Adding a vector triggers a change
@@ -483,13 +483,13 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
     MaterializedField c = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL);
     c3.add(TypeHelper.getNewVector(c, fixture.allocator(), null));
     c3.buildSchema(SelectionVectorMode.NONE);
-    opExec.batchAccessor.setContainer(c3);
+    opExec.batchAccessor.addBatch(c3);
     assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
     schemaVersion = opExec.batchAccessor().schemaVersion();
 
     // No change if same schema again
 
-    opExec.batchAccessor.setContainer(c3);
+    opExec.batchAccessor.addBatch(c3);
     assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
 
     // Removing a vector triggers a change
@@ -497,7 +497,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
     c3.remove(c3.getValueVector(2).getValueVector());
     c3.buildSchema(SelectionVectorMode.NONE);
     assertEquals(2, c3.getNumberOfColumns());
-    opExec.batchAccessor.setContainer(c3);
+    opExec.batchAccessor.addBatch(c3);
     assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
     schemaVersion = opExec.batchAccessor().schemaVersion();
 
@@ -525,7 +525,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
         .build();
 
     ContainerAndSv2Accessor accessor = new ContainerAndSv2Accessor();
-    accessor.setContainer(rs.container());
+    accessor.addBatch(rs.container());
     accessor.setSelectionVector(rs.getSv2());
 
     MockOperatorExec opExec = new MockOperatorExec(accessor);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
index 0e21092..8910907 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
@@ -21,12 +21,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixtureBuilder;
 import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
@@ -122,16 +121,19 @@ public class BaseScanOperatorExecTest extends SubOperatorTest {
     }
   }
 
+  protected TupleMetadata expectedSchema() {
+    return new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR, 10)
+        .buildSchema();
+  }
+
   protected SingleRowSet makeExpected() {
     return makeExpected(0);
   }
 
   protected SingleRowSet makeExpected(int offset) {
-    TupleMetadata expectedSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("b", MinorType.VARCHAR, 10)
-        .buildSchema();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema())
         .addRow(offset + 10, "fred")
         .addRow(offset + 20, "wilma")
         .build();
@@ -160,20 +162,24 @@ public class BaseScanOperatorExecTest extends SubOperatorTest {
     }
 
     @Override
-    protected ManagedScanFramework newFramework() {
+    public ScanFixture build() {
       builder.setReaderFactory(new BasicScanFactory(readers.iterator()));
-      return new ManagedScanFramework(builder);
+      return super.build();
     }
   }
 
   @SafeVarargs
-  public static ScanFixture simpleFixture(ManagedReader<? extends SchemaNegotiator>...readers) {
+  public static BaseScanFixtureBuilder simpleBuilder(ManagedReader<? extends SchemaNegotiator>...readers) {
     BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
     builder.projectAll();
     for (ManagedReader<? extends SchemaNegotiator> reader : readers) {
       builder.addReader(reader);
     }
-    return builder.build();
+    return builder;
   }
 
-}
\ No newline at end of file
+  @SafeVarargs
+  public static ScanFixture simpleFixture(ManagedReader<? extends SchemaNegotiator>...readers) {
+    return simpleBuilder(readers).build();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
index 1e99b79..8655637 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.physical.impl.scan;
 
 import java.util.List;
 
-import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractSubScan;
@@ -28,21 +28,20 @@ import org.apache.drill.exec.physical.base.Scan;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumnDefn;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
 import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver;
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
-import org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.test.OperatorFixture;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.OperatorFixture;
 
 public class ScanTestUtils {
 
@@ -54,10 +53,11 @@ public class ScanTestUtils {
   public static final String SUFFIX_COL = "suffix";
   public static final String PARTITION_COL = "dir";
 
-
   public static abstract class ScanFixtureBuilder {
 
     public final OperatorFixture opFixture;
+    // All tests are designed to use the schema batch
+    public boolean enableSchemaBatch = true;
 
     public ScanFixtureBuilder(OperatorFixture opFixture) {
       this.opFixture = opFixture;
@@ -81,11 +81,9 @@ public class ScanTestUtils {
       builder().setProjection(projection);
     }
 
-    protected abstract ManagedScanFramework newFramework();
-
     public ScanFixture build() {
-      ManagedScanFramework framework = newFramework();
-      ScanOperatorExec scanOp = new ScanOperatorExec(framework);
+      builder().enableSchemaBatch(enableSchemaBatch);
+      ScanOperatorExec scanOp = builder().buildScan();
       Scan scanConfig = new AbstractSubScan("bob") {
 
         @Override
@@ -118,6 +116,15 @@ public class ScanTestUtils {
     }
   }
 
+  public static class MockScanBuilder extends ScanOrchestratorBuilder {
+
+    @Override
+    public ScanOperatorEvents buildEvents() {
+      throw new IllegalStateException("Not used in this test.");
+    }
+
+  }
+
   /**
    * Type-safe way to define a list of parsers.
    * @param parsers as a varArgs list convenient for testing
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
index 7c9dd87..babe3ab 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
@@ -25,7 +25,9 @@ import java.util.List;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
 import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
@@ -82,7 +84,7 @@ public class TestColumnsArray extends SubOperatorTest {
 
     // Configure the schema orchestrator
 
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     builder.withMetadata(metadataManager);
     builder.addParser(colsManager.projectionParser());
     builder.addResolver(colsManager.resolver());
@@ -259,7 +261,7 @@ public class TestColumnsArray extends SubOperatorTest {
 
     // Configure the schema orchestrator
 
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new ColumnsScanBuilder();
     builder.addParser(colsManager.projectionParser());
     builder.addResolver(colsManager.resolver());
     builder.setProjection(cols);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
index 00f169f..6643b3b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
@@ -34,13 +34,11 @@ import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixtureBuilder;
 import org.apache.drill.exec.physical.impl.scan.TestFileScanFramework.DummyFileWork;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -95,7 +93,7 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
     }
 
     @Override
-    protected ManagedScanFramework newFramework() {
+    public ScanFixture build() {
 
       // Bass-ackward construction of the list of files from
       // a set of text fixture readers. Normal implementations
@@ -109,7 +107,7 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
       builder.setConfig(new Configuration());
       builder.setFiles(blocks);
       builder.setReaderFactory(new MockFileReaderFactory(readers));
-      return new ColumnsScanFramework(builder);
+      return super.build();
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index 4ade742..5c527e3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -30,12 +30,10 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixtureBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
@@ -141,7 +139,7 @@ public class TestFileScanFramework extends SubOperatorTest {
     }
 
     @Override
-    protected ManagedScanFramework newFramework() {
+    public ScanFixture build() {
 
       // Bass-ackward construction of the list of files from
       // a set of text fixture readers. Normal implementations
@@ -155,7 +153,7 @@ public class TestFileScanFramework extends SubOperatorTest {
       builder.setConfig(new Configuration());
       builder.setFiles(blocks);
       builder.setReaderFactory(new MockFileReaderFactory(readers));
-      return new FileScanFramework(builder);
+      return super.build();
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
index dd07d5a..32a5ec1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.VectorContainer;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -373,6 +374,9 @@ public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
 
   /**
    * Test multiple readers, all EOF on first batch.
+   * The scan will return one empty batch to declare the
+   * early schema. Results in an empty (rather than null)
+   * result set.
    */
 
   @Test
@@ -388,10 +392,15 @@ public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
     // EOF
 
     assertTrue(scan.buildSchema());
-    assertFalse(scan.next());
+    assertTrue(scan.next());
+    VectorContainer container = scan.batchAccessor().getOutgoingContainer();
+    assertEquals(0, container.getRecordCount());
+    assertEquals(2, container.getNumberOfColumns());
+
     assertTrue(reader1.closeCalled);
     assertTrue(reader2.closeCalled);
     assertEquals(0, scan.batchAccessor().getRowCount());
+    assertFalse(scan.next());
 
     scanFixture.close();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
index a7a4f64..5dd47c0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertTrue;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
-import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -97,6 +99,44 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
     scan.close();
   }
 
+  @Test
+  public void testEarlySchemaLifecycleNoSchemaBatch() {
+
+    // Create a mock reader, return one batch with data.
+
+    MockEarlySchemaReader reader = new MockEarlySchemaReader();
+    reader.batchLimit = 1;
+
+    // Create the scan operator
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader);
+    builder.enableSchemaBatch = false;
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    SingleRowSet expected = makeExpected();
+    RowSetComparison verifier = new RowSetComparison(expected);
+
+    // First batch: return with data.
+
+    assertTrue(scan.next());
+    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+
+    // EOF
+
+    assertFalse(scan.next());
+    assertEquals(0, scan.batchAccessor().getRowCount());
+
+    // Next again: no-op
+
+    assertFalse(scan.next());
+    scanFixture.close();
+
+    // Close again: no-op
+
+    scan.close();
+  }
+
   private static class MockEarlySchemaReader3 extends MockEarlySchemaReader {
 
     @Override
@@ -182,12 +222,18 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
     ScanOperatorExec scan = scanFixture.scanOp;
     assertTrue(scan.buildSchema());
 
-    // EOF
+    // EOF. Returns a single empty batch with early schema
+    // in order to provide an empty result set.
 
-    assertFalse(scan.next());
+    assertTrue(scan.next());
     assertTrue(reader.closeCalled);
     assertEquals(0, scan.batchAccessor().getRowCount());
 
+    RowSetUtilities.verify(
+        RowSetBuilder.emptyBatch(fixture.allocator(), expectedSchema()),
+        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+
+    assertFalse(scan.next());
     scanFixture.close();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java
index ccacdae..3852f53 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.BaseScanOperatorExecTest.BaseScanFixtureBuilder;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
@@ -141,6 +142,49 @@ public class TestScanOperExecLateSchema extends BaseScanOperatorExecTest {
     scanFixture.close();
   }
 
+  @Test
+  public void testLateSchemaLifecycleNoSchemaBatch() {
+
+    // Create a mock reader, return two batches: one schema-only, another with data.
+
+    MockLateSchemaReader reader = new MockLateSchemaReader();
+    reader.batchLimit = 2;
+    reader.returnDataOnFirst = true;
+
+    // Create the scan operator
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader);
+    builder.enableSchemaBatch = false;
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    // Standard startup
+
+    assertFalse(reader.openCalled);
+
+    // Create the expected result.
+
+    // First batch with data.
+
+    assertTrue(scan.next());
+    RowSetUtilities.verify(makeExpected(0),
+        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+
+    // Second batch.
+
+    assertTrue(scan.next());
+    RowSetUtilities.verify(makeExpected(20),
+        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+
+     // EOF
+
+    assertFalse(scan.next());
+    assertTrue(reader.closeCalled);
+    assertEquals(0, scan.batchAccessor().getRowCount());
+
+    scanFixture.close();
+  }
+
   /**
    * Test the case that a late scan operator is closed before
    * the first reader is opened.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java
index 523d826..b0432db 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.BaseScanOperatorExecTest.BaseScanFixtureBuilder;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
@@ -125,6 +126,26 @@ public class TestScanOperExecSmoothing extends BaseScanOperatorExecTest {
     assertEquals(1, scan.batchAccessor().schemaVersion());
     scan.batchAccessor().release();
 
+    readSchemaChangeBatches(scanFixture, reader2);
+  }
+
+  @Test
+  public void testSchemaChangeNoSchemaBatch() {
+    MockEarlySchemaReader reader1 = new MockEarlySchemaReader();
+    reader1.batchLimit = 2;
+    MockEarlySchemaReader reader2 = new MockEarlySchemaReader2();
+    reader2.batchLimit = 2;
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.enableSchemaBatch = false;
+    ScanFixture scanFixture = builder.build();
+
+    readSchemaChangeBatches(scanFixture, reader2);
+  }
+
+  private void readSchemaChangeBatches(ScanFixture scanFixture, MockEarlySchemaReader reader2) {
+    ScanOperatorExec scan = scanFixture.scanOp;
+
     // First batch
 
     assertTrue(scan.next());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
index 3f2b5f1..4473bbb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder;
@@ -62,7 +63,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEarlySchemaWildcard() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT * ...
 
@@ -153,7 +154,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEarlySchemaSelectAll() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a, b ...
 
@@ -205,7 +206,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEarlySchemaSelectAllReorder() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT b, a ...
 
@@ -260,7 +261,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEarlySchemaSelectExtra() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a, b, c ...
 
@@ -316,7 +317,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEarlySchemaSelectExtraCustomType() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // Null columns of type VARCHAR
 
@@ -379,7 +380,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEarlySchemaSelectSubset() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a ...
 
@@ -437,7 +438,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEarlySchemaSelectNone() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT ...
     // (Like SELECT COUNT(*) ...
@@ -519,7 +520,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEmptySchema() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT * ...
 
@@ -566,7 +567,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testEmptySchemaExtra() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT * ...
 
@@ -623,7 +624,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testTypeSmoothingExplicit() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     TupleMetadata table1Schema = new SchemaBuilder()
         .add("A", MinorType.BIGINT)
         .addNullable("B", MinorType.VARCHAR)
@@ -736,7 +737,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testTypeSmoothing() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a, b ...
 
@@ -838,7 +839,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   @Test
   public void testModeSmoothing() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     builder.enableSchemaSmoothing(true);
     builder.setProjection(RowSetTestUtils.projectList("a"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
@@ -959,7 +960,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
   @Test
   public void testColumnReordering() {
 
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     builder.enableSchemaSmoothing(true);
     builder.setProjection(RowSetTestUtils.projectList("a", "b", "c"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
index c2262d2..70496bc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder;
@@ -55,7 +56,7 @@ public class TestScanOrchestratorLateSchema extends SubOperatorTest {
 
   @Test
   public void testLateSchemaWildcard() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT * ...
 
@@ -111,7 +112,7 @@ public class TestScanOrchestratorLateSchema extends SubOperatorTest {
 
   @Test
   public void testLateSchemaSelectDisjoint() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a, c ...
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
index cebe055..037e4b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
 import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
@@ -76,7 +77,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     builder.withMetadata(metadataManager);
 
     // SELECT *, filename, suffix ...
@@ -135,7 +136,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
   @Test
   public void testSelectNone() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
@@ -203,7 +204,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
         .setMode(DataMode.OPTIONAL)
         .build();
 
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     builder.setNullType(nullType);
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
@@ -280,7 +281,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
   @Test
   public void testMixture() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
@@ -344,7 +345,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
   @Test
   public void testMetadataMulti() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     Path filePathA = new Path("hdfs:///w/x/y/a.csv");
     Path filePathB = new Path("hdfs:///w/x/b.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index 22d7431..9da5cc8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -28,6 +28,7 @@ import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
@@ -816,7 +817,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testWildcardSmoothing() {
-    ScanOrchestratorBuilder builder = new ScanOrchestratorBuilder();
+    ScanOrchestratorBuilder builder = new MockScanBuilder();
     builder.enableSchemaSmoothing(true);
     builder.setProjection(RowSetTestUtils.projectAll());
     final ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator(), builder);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index 2071546..6079026 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -35,6 +35,17 @@ public class BaseCsvTest extends ClusterTest {
   protected static final String NESTED_DIR = "nested";
   protected static final String ROOT_FILE = "first.csv";
   protected static final String NESTED_FILE = "second.csv";
+  protected static final String EMPTY_FILE = "empty.csv";
+
+  /**
+   * The scan operator can return an empty schema batch as
+   * the first batch. But, this broke multiple operators that
+   * do not handle this case. So, it is turned off for now.
+   * Tests that verified the empty batch use this flag to
+   * disable that checking.
+   */
+
+  protected static boolean SCHEMA_BATCH_ENABLED = false;
 
   protected static String validHeaders[] = {
       "a,b,c",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
index d190a63..c8580d4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
@@ -17,19 +17,21 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.test.TestBuilder;
-
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.nio.file.Paths;
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.TestBuilder;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+@Category(RowSetTests.class)
 public class TestCsvHeader extends BaseTestQuery{
 
   private static final String ROOT = "store/text/data/cars.csvh";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 0b0c181..8a2e8b0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -51,13 +51,9 @@ import org.junit.experimental.categories.Category;
  * seems to be a bug in the Project operator.</li>
  * </ul>
  *
- * The tests all demonstrate that the row set scan framework
- * delivers a first empty batch from each scan. I (Paul) had understood
- * that we had an "fast schema" path as the result of the "empty batch"
- * project. However, the V2 reader did not provide the schema-only
- * first batch. So, not sure if doing so is a feature, or a bug because
- * things changed. Easy enough to change if we choose to. If so, the
- * tests here would remove the test for that schema-only batch.
+ * The tests assume that the "early schema" mechanism is disabled: that
+ * the first batch either contains data, or that the first batch is empty
+ * only if there is no data at all to be read.
  *
  * @see {@link TestHeaderBuilder}
  */
@@ -100,8 +96,21 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     buildFile(COLUMNS_FILE_NAME, columnsCol);
   }
 
-  private static final String EMPTY_FILE = "empty.csv";
-
+  /**
+   * An empty file with schema is invalid: there is no header line
+   * and so there is no schema. It is probably not helpful to return a
+   * batch with an empty schema; doing so would simply conflict with the
+   * schema of a non-empty file. Also, there is no reason to throw an
+   * error; this is not a problem serious enough to fail the query. Instead,
+   * we elect to simply return no results at all: no schema and no data.
+   * <p>
+   * Prior research revealed that most DB engines can handle a null
+   * empty result set: no schema, no rows. For example:
+   * <br><tt>SELECT * FROM VALUES ();</tt><br>
+   * The implementation tested here follows that pattern.
+   *
+   * @see {@link TestCsvWithoutHeaders#testEmptyFile()}
+   */
   @Test
   public void testEmptyFile() throws IOException {
     buildFile(EMPTY_FILE, new String[] {});
@@ -354,12 +363,15 @@ public class TestCsvWithHeaders extends BaseCsvTest {
         .addNullable("dir0", MinorType.VARCHAR)
         .buildSchema();
 
-    // First batch is empty; just carries the schema.
+    RowSet rowSet;
+    if (SCHEMA_BATCH_ENABLED) {
+      // First batch is empty; just carries the schema.
 
-    assertTrue(iter.hasNext());
-    RowSet rowSet = iter.next();
-    assertEquals(0, rowSet.rowCount());
-    rowSet.clear();
+      assertTrue(iter.hasNext());
+      rowSet = iter.next();
+      assertEquals(0, rowSet.rowCount());
+      rowSet.clear();
+    }
 
     // Read the other two batches.
 
@@ -409,12 +421,15 @@ public class TestCsvWithHeaders extends BaseCsvTest {
         .addNullable("dir10", MinorType.VARCHAR)
         .buildSchema();
 
-    // First batch is empty; just carries the schema.
+    RowSet rowSet;
+    if (SCHEMA_BATCH_ENABLED) {
+      // First batch is empty; just carries the schema.
 
-    assertTrue(iter.hasNext());
-    RowSet rowSet = iter.next();
-    RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
-        rowSet);
+      assertTrue(iter.hasNext());
+      rowSet = iter.next();
+      RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
+          rowSet);
+    }
 
     // Read the two batches.
 
@@ -461,12 +476,15 @@ public class TestCsvWithHeaders extends BaseCsvTest {
         .addNullable("dir1", MinorType.VARCHAR)
         .buildSchema();
 
-    // First batch is empty; just carries the schema.
+    RowSet rowSet;
+    if (SCHEMA_BATCH_ENABLED) {
+      // First batch is empty; just carries the schema.
 
-    assertTrue(iter.hasNext());
-    RowSet rowSet = iter.next();
-    RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
-        rowSet);
+      assertTrue(iter.hasNext());
+      rowSet = iter.next();
+      RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
+          rowSet);
+    }
 
     // Read the two batches.
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index 9a76c1f..8102148 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -288,7 +288,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
             sawSchema = true;
           }
         }
-        assertTrue(sawSchema);
+        assertTrue(!SCHEMA_BATCH_ENABLED || sawSchema);
         assertTrue(sawFile1);
         assertTrue(sawFile2);
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index 8918b39..a021bec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.TestEmptyInputSql;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
@@ -107,6 +108,28 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
     RowSetUtilities.verify(expected, actual);
   }
 
+  /**
+   * An empty no-headers file has a valid schema: it will always
+   * be `columns`. The scan operator can return a single, empty
+   * batch with that schema to represent the empty file.
+   *
+   * @see {@link TestEmptyInputSql#testQueryEmptyCsv}
+   */
+  @Test
+  public void testEmptyFile() throws IOException {
+    buildFile(EMPTY_FILE, new String[] {});
+    String sql = "SELECT * FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, EMPTY_FILE).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .build();
+    RowSetUtilities.verify(expected, actual);
+  }
+
   @Test
   public void testColumns() throws IOException {
     String sql = "SELECT columns FROM `dfs.data`.`%s`";
@@ -193,14 +216,15 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
   }
 
   /**
-   * Test partition expansion in V3.
+   * Test partition expansion.
    * <p>
    * V3, as in V2 before Drill 1.12, puts partition columns after
    * data columns (so that data columns don't shift positions if
    * files are nested to another level.)
    */
+
   @Test
-  public void testPartitionExpansionV3() throws IOException {
+  public void testPartitionExpansion() throws IOException {
     String sql = "SELECT * FROM `dfs.data`.`%s`";
     Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
 
@@ -209,14 +233,17 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
         .addNullable("dir0", MinorType.VARCHAR)
         .buildSchema();
 
-    // First batch is empty; just carries the schema.
+    RowSet rowSet;
+    if (SCHEMA_BATCH_ENABLED) {
+      // First batch is empty; just carries the schema.
 
-    assertTrue(iter.hasNext());
-    RowSet rowSet = iter.next();
-    assertEquals(0, rowSet.rowCount());
-    rowSet.clear();
+      assertTrue(iter.hasNext());
+      rowSet = iter.next();
+      assertEquals(0, rowSet.rowCount());
+      rowSet.clear();
+    }
 
-    // Read the other two batches.
+    // Read the two data batches.
 
     for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
index b1a2c94..7e17d6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -35,6 +36,7 @@ import org.apache.drill.test.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Demonstrates a race condition inherent in the way that partition
@@ -52,6 +54,7 @@ import org.junit.Test;
  * current "V3" version. The tests here verify this behavior.
  */
 
+@Category(RowSetTests.class)
 public class TestPartitionRace extends BaseCsvTest {
 
   @BeforeClass
@@ -84,15 +87,16 @@ public class TestPartitionRace extends BaseCsvTest {
         .addNullable("dir0", MinorType.VARCHAR)
         .buildSchema();
 
-    // Loop to run the query 10 times to verify no race
-
-    // First batch is empty; just carries the schema.
-
     Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-    assertTrue(iter.hasNext());
-    RowSet rowSet = iter.next();
-    assertEquals(0, rowSet.rowCount());
-    rowSet.clear();
+    RowSet rowSet;
+    if (SCHEMA_BATCH_ENABLED) {
+      // First batch is empty; just carries the schema.
+
+      assertTrue(iter.hasNext());
+      rowSet = iter.next();
+      assertEquals(0, rowSet.rowCount());
+      rowSet.clear();
+    }
 
     // Read the two batches.
 
@@ -147,13 +151,16 @@ public class TestPartitionRace extends BaseCsvTest {
       boolean sawNestedFirst = false;
       for (int i = 0; i < 10; i++) {
 
-        // First batch is empty; just carries the schema.
-
         Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-        assertTrue(iter.hasNext());
-        RowSet rowSet = iter.next();
-        assertEquals(0, rowSet.rowCount());
-        rowSet.clear();
+        RowSet rowSet;
+        if (SCHEMA_BATCH_ENABLED) {
+          // First batch is empty; just carries the schema.
+
+          assertTrue(iter.hasNext());
+          rowSet = iter.next();
+          assertEquals(0, rowSet.rowCount());
+          rowSet.clear();
+        }
 
         // Read the two batches.
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
index ffb7c12..909e3de 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
-import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -40,10 +39,7 @@ import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
-// Log reader now hosted on the row set framework
-@Category(RowSetTests.class)
 public class TestLogReader extends ClusterTest {
 
   public static final String DATE_ONLY_PATTERN = "(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 795cdae..4e6fdb3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -52,13 +53,12 @@ import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
 import org.apache.drill.test.ClientFixture.StatementParser;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetReader;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.joda.time.Period;
 
 /**
@@ -355,11 +355,17 @@ public class QueryBuilder {
   public DirectRowSet rowSet() throws RpcException {
 
     // Ignore all but the first non-empty batch.
+    // Always return the last batch, which may be empty.
 
-    QueryDataBatch dataBatch = null;
+    QueryDataBatch resultBatch = null;
     for (QueryDataBatch batch : results()) {
-      if (dataBatch == null  &&  batch.getHeader().getRowCount() != 0) {
-        dataBatch = batch;
+      if (resultBatch == null) {
+        resultBatch = batch;
+      } else if (resultBatch.getHeader().getRowCount() == 0) {
+        resultBatch.release();
+        resultBatch = batch;
+      } else if (batch.getHeader().getRowCount() > 0) {
+        throw new IllegalStateException("rowSet() returns a single batch, but this query returned multiple batches. Consider rowSetIterator() instead.");
       } else {
         batch.release();
       }
@@ -367,7 +373,7 @@ public class QueryBuilder {
 
     // No results?
 
-    if (dataBatch == null) {
+    if (resultBatch == null) {
       return null;
     }
 
@@ -375,10 +381,23 @@ public class QueryBuilder {
 
     final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
     try {
-      loader.load(dataBatch.getHeader().getDef(), dataBatch.getData());
-      dataBatch.release();
+      loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
+      resultBatch.release();
       VectorContainer container = loader.getContainer();
       container.setRecordCount(loader.getRecordCount());
+
+      // Null results? Drill will return a single batch with no rows
+      // and no columns even if the scan (or other) operator returns
+      // no batches at all. For ease of testing, simply map this null
+      // result set to a null output row set that says "nothing at all
+      // was returned." Note that this is different than an empty result
+      // set which has a schema, but no rows.
+
+      if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) {
+        container.clear();
+        return null;
+      }
+
       return DirectRowSet.fromContainer(container);
     } catch (SchemaChangeException e) {
       throw new IllegalStateException(e);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
index 91eb7d0..502ce97 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -17,16 +17,16 @@
  */
 package org.apache.drill.test.rowSet;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import java.util.Set;
+
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 
-import java.util.Set;
-
 /**
  * Fluent builder to quickly build up an row set (record batch)
  * programmatically. Starting with an {@link org.apache.drill.test.OperatorFixture}:
@@ -75,6 +75,10 @@ public final class RowSetBuilder {
     writer = rowSet.writer(capacity, conversionFactory);
   }
 
+  public static RowSet emptyBatch(BufferAllocator allocator, TupleMetadata schema) {
+    return new RowSetBuilder(allocator, schema).build();
+  }
+
   public RowSetWriter writer() { return writer; }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index 7fe606f..c2bfa54 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.VectorOverflowException;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.bouncycastle.util.Arrays;
@@ -264,5 +265,4 @@ public class RowSetUtilities {
   public static BigDecimal dec(String value) {
     return new BigDecimal(value);
   }
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 0101a3a..4be33f2 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -398,10 +398,10 @@ public class MaterializedField {
       .append("` (")
       .append(type.getMinorType().name());
 
-    if (type.hasPrecision()) {
+    if (type.hasPrecision() && (type.getPrecision() > 0 || Types.isDecimalType(type))) {
       builder.append("(");
       builder.append(type.getPrecision());
-      if (type.hasScale()) {
+      if (type.hasScale() && type.getScale() > 0) {
         builder.append(", ");
         builder.append(type.getScale());
       }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnBuilder.java
index d23eeb4..2e17411 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnBuilder.java
@@ -50,13 +50,24 @@ public class ColumnBuilder {
   }
 
   public ColumnBuilder setPrecision(int precision) {
-    typeBuilder.setPrecision(precision);
+
+    // Set the precision only if non-zero. Some (naive) code in Drill
+    // checks if precision is set as a way to determine if the precision
+    // is non-zero. The correct pattern is to check if the precision is
+    // non-zero. This unnecessary check exists simply to avoid breaking
+    // that incorrect code.
+
+    if (precision != 0) {
+      typeBuilder.setPrecision(precision);
+    }
     return this;
   }
 
   public ColumnBuilder setPrecisionAndScale(int precision, int scale) {
-    typeBuilder.setPrecision(precision);
-    typeBuilder.setScale(scale);
+    if (precision != 0) {
+      typeBuilder.setPrecision(precision);
+      typeBuilder.setScale(scale);
+    }
     return this;
   }
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
index de8b668..8e72e95 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.record.metadata;
 
+import java.math.BigDecimal;
+
 import org.apache.drill.common.types.BooleanType;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -31,8 +33,6 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 
-import java.math.BigDecimal;
-
 /**
  * Primitive (non-map) column. Describes non-nullable, nullable and array types
  * (which differ only in mode, but not in metadata structure.)
@@ -166,6 +166,20 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
 
   @Override
   public MajorType majorType() {
+
+    // Set the precision for all types. Some (naive) code in Drill
+    // checks if precision is set as a way to determine if the precision
+    // is non-zero. (DRILL-7308)
+    //
+    // If we try to set the precision only if non-zero, then other code
+    // fails, such as the TPC-H SF1 customer table test in the Functional
+    // test suite.
+    //
+    // So, the protocol is: if a type might use precision (DECIMAL, VARCHAR),
+    // the precision should be set, even if zero. Code that wants to know if
+    // the precision is zero should check for the zero value, it should NOT
+    // check if the precision is set or not.
+
     return MajorType.newBuilder()
         .setMinorType(type)
         .setMode(mode)