You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/07/15 10:27:33 UTC

[drill] branch master updated: DRILL-6951: Merge row set based mock data source

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 3599dfd  DRILL-6951: Merge row set based mock data source
3599dfd is described below

commit 3599dfd2690023db6a7af66512b3e44759eb34a1
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Tue Jun 18 19:26:16 2019 -0700

    DRILL-6951: Merge row set based mock data source
    
    The mock data source is used in several tests to generate a large volume
    of sample data, such as when testing spilling. The mock data source also
    lets us try new plugin featues in a very simple context. During the
    development of the row set framework, the mock data source was converted
    to use the new framework to verify functionality. This commit upgrades
    the mock data source with that work.
    
    The work changes non of the functionality. It does, however, improve
    memory usage. Batchs are limited, by default, to 10 MB in size. The row
    set framework minimizes internal fragmentation in the largest vector.
    (Previously, internal fragmentation averaged 25% but could be as high as
    50%.)
    
    As it turns out, the hash aggregate tests depended on the internal
    fragmentation: without it, the hash agg no longer spilled for the same
    row count. Adjusted the generated row counts to recreate a data volume
    that caused spilling.
    
    One test in particular always failed due to assertions in the hash agg
    code. These seem true bugs and are described in DRILL-7301. After
    multiple failed attempts to get the test to work, it ws disabled until
    DRILL-7301 is fixed.
    
    Added a new unit test to sanity check the mock data source. (No test
    already existed for this functionality except as verified via other unit
    tests.)
---
 .../impl/scan/framework/BasicScanFactory.java      |   4 +-
 .../{BooleanGen.java => AbstractFieldGen.java}     |  23 +-
 .../apache/drill/exec/store/mock/BooleanGen.java   |  21 +-
 .../apache/drill/exec/store/mock/ColumnDef.java    |  20 +-
 .../org/apache/drill/exec/store/mock/DateGen.java  |  21 +-
 .../apache/drill/exec/store/mock/DoubleGen.java    |  21 +-
 .../exec/store/mock/ExtendedMockBatchReader.java   | 148 ++++++++++
 .../exec/store/mock/ExtendedMockRecordReader.java  | 158 -----------
 .../org/apache/drill/exec/store/mock/FieldGen.java |   6 +-
 .../org/apache/drill/exec/store/mock/IntGen.java   |  21 +-
 .../exec/store/mock/MockScanBatchCreator.java      |  72 ++++-
 .../apache/drill/exec/store/mock/MockStorePOP.java |   2 -
 .../apache/drill/exec/store/mock/MockTableDef.java |   4 +-
 .../org/apache/drill/exec/store/mock/MoneyGen.java |  22 +-
 .../apache/drill/exec/store/mock/StringGen.java    |  16 +-
 .../drill/exec/store/mock/VaryingStringGen.java    |  14 +-
 .../exec/fn/interp/ExpressionInterpreterTest.java  |   9 +-
 .../exec/physical/impl/agg/TestHashAggrSpill.java  | 129 +++++----
 .../impl/scan/BaseScanOperatorExecTest.java        |  23 +-
 .../drill/exec/store/mock/TestMockPlugin.java      | 148 ++++++++++
 .../drill/exec/store/mock/TestMockRowReader.java   | 304 +++++++++++++++++++++
 .../drill/exec/record/metadata/SchemaBuilder.java  |   7 +
 22 files changed, 821 insertions(+), 372 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/BasicScanFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/BasicScanFactory.java
index d9e8847..81e7dad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/BasicScanFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/BasicScanFactory.java
@@ -35,9 +35,9 @@ import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.R
 
 public class BasicScanFactory implements ReaderFactory {
 
-  private final Iterator<ManagedReader<? extends SchemaNegotiator>> iterator;
+  private final Iterator<ManagedReader<SchemaNegotiator>> iterator;
 
-  public BasicScanFactory(Iterator<ManagedReader<? extends SchemaNegotiator>> iterator) {
+  public BasicScanFactory(Iterator<ManagedReader<SchemaNegotiator>> iterator) {
     this.iterator = iterator;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/BooleanGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/AbstractFieldGen.java
similarity index 67%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/BooleanGen.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/AbstractFieldGen.java
index dd84f4d..c4eb2ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/BooleanGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/AbstractFieldGen.java
@@ -19,24 +19,17 @@ package org.apache.drill.exec.store.mock;
 
 import java.util.Random;
 
-import org.apache.drill.exec.vector.BitVector;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
-public class BooleanGen implements FieldGen {
+public abstract class AbstractFieldGen implements FieldGen {
 
-  private Random rand = new Random();
+  protected ColumnDef colDef;
+  protected ScalarWriter colWriter;
+  protected final Random rand = new Random();
 
   @Override
-  public void setup(ColumnDef colDef) { }
-
-  public int value() {
-    return rand.nextBoolean() ? 1 : 0;
+  public void setup(ColumnDef colDef, ScalarWriter colLoader) {
+    this.colDef = colDef;
+    this.colWriter = colLoader;
   }
-
-  @Override
-  public void setValue(ValueVector v, int index ) {
-    BitVector vector = (BitVector) v;
-    vector.getMutator().set(index, value());
-  }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/BooleanGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/BooleanGen.java
index dd84f4d..48b3220 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/BooleanGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/BooleanGen.java
@@ -17,26 +17,11 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import java.util.Random;
-
-import org.apache.drill.exec.vector.BitVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class BooleanGen implements FieldGen {
-
-  private Random rand = new Random();
-
-  @Override
-  public void setup(ColumnDef colDef) { }
-
-  public int value() {
-    return rand.nextBoolean() ? 1 : 0;
-  }
+public class BooleanGen extends AbstractFieldGen {
 
   @Override
-  public void setValue(ValueVector v, int index ) {
-    BitVector vector = (BitVector) v;
-    vector.getMutator().set(index, value());
+  public void setValue() {
+    colWriter.setInt(rand.nextBoolean() ? 1 : 0);
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ColumnDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ColumnDef.java
index 1a7dc30..abdac95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ColumnDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ColumnDef.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mock;
 
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.store.mock.MockTableDef.MockColumn;
@@ -33,15 +34,28 @@ public class ColumnDef {
   public String name;
   public int width;
   public FieldGen generator;
+  public boolean nullable;
+  public int nullablePercent;
 
   public ColumnDef(MockColumn mockCol) {
     this.mockCol = mockCol;
     name = mockCol.getName();
-    if (mockCol.getMinorType() == MinorType.VARCHAR && mockCol.getPrecision() > 0) {
-      width = mockCol.getPrecision();
+    if (mockCol.getMinorType() == MinorType.VARCHAR &&
+        mockCol.getWidth() > 0) {
+      width = mockCol.getWidth();
     } else {
       width = TypeHelper.getSize(mockCol.getMajorType());
     }
+    nullable = mockCol.getMode() == DataMode.OPTIONAL;
+    if (nullable) {
+      nullablePercent = 25;
+      if (mockCol.properties != null) {
+        Object value = mockCol.properties.get("nulls");
+        if (value != null  && value instanceof Integer) {
+          nullablePercent = (Integer) value;
+        }
+      }
+    }
     makeGenerator();
   }
 
@@ -68,7 +82,6 @@ public class ColumnDef {
           | IllegalAccessException | ClassCastException e) {
         throw new IllegalArgumentException("Generator " + genName + " is undefined for mock field " + name);
       }
-      generator.setup(this);
       return;
     }
 
@@ -167,7 +180,6 @@ public class ColumnDef {
     if (generator == null) {
       throw new IllegalArgumentException("No default column generator for column " + name + " of type " + minorType);
     }
-    generator.setup(this);
   }
 
   public ColumnDef(MockColumn mockCol, int rep) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DateGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DateGen.java
index 100d427..cc5b63a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DateGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DateGen.java
@@ -19,10 +19,6 @@ package org.apache.drill.exec.store.mock;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.Random;
-
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
 
 /**
  * Very simple date value generator that produces ISO dates
@@ -37,12 +33,11 @@ import org.apache.drill.exec.vector.VarCharVector;
  * new Java 8 classes because Drill prefers to build with Java 7.
  */
 
-public class DateGen implements FieldGen {
+public class DateGen extends AbstractFieldGen {
 
   private final int ONE_DAY = 24 * 60 * 60 * 1000;
   private final int ONE_YEAR = ONE_DAY * 365;
 
-  private final Random rand = new Random();
   private long baseTime;
   private SimpleDateFormat fmt;
 
@@ -53,17 +48,9 @@ public class DateGen implements FieldGen {
   }
 
   @Override
-  public void setup(ColumnDef colDef) { }
-
-  private long value() {
-    return baseTime + rand.nextInt(365) * ONE_DAY;
-  }
-
-  @Override
-  public void setValue(ValueVector v, int index) {
-    VarCharVector vector = (VarCharVector) v;
-    long randTime = baseTime + value();
+  public void setValue() {
+    long randTime = baseTime + baseTime + rand.nextInt(365) * ONE_DAY;
     String str = fmt.format(new Date(randTime));
-    vector.getMutator().setSafe(index, str.getBytes());
+    colWriter.setString(str);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DoubleGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DoubleGen.java
index e28a394..3704ffe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DoubleGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/DoubleGen.java
@@ -17,32 +17,17 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import java.util.Random;
-
-import org.apache.drill.exec.vector.Float8Vector;
-import org.apache.drill.exec.vector.ValueVector;
-
 /**
  * Generates random field values uniformly distributed over
  * the range +-1 million, with any number of digits past
  * the decimal point.
  */
 
-public class DoubleGen implements FieldGen {
-
-  private final Random rand = new Random();
-
-  @Override
-  public void setup(ColumnDef colDef) { }
-
-  private double value() {
-    return rand.nextDouble() * 2_000_000 - 1_000_000;
-  }
+public class DoubleGen extends AbstractFieldGen {
 
   @Override
-  public void setValue(ValueVector v, int index) {
-    Float8Vector vector = (Float8Vector) v;
-    vector.getMutator().set(index, value());
+  public void setValue() {
+    colWriter.setDouble(rand.nextDouble() * 2_000_000 - 1_000_000);
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
new file mode 100644
index 0000000..10f4c23
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mock;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.mock.MockTableDef.MockColumn;
+import org.apache.drill.exec.store.mock.MockTableDef.MockScanEntry;
+
+/**
+ * Extended form of the mock record reader that uses generator class
+ * instances to create the mock values. This is a work in progress.
+ * Generators exist for a few simple required types. One also exists
+ * to generate strings that contain dates.
+ * <p>
+ * The definition is provided inside the sub scan used to create the
+ * {@link ScanBatch} used to create this record reader.
+ */
+
+public class ExtendedMockBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private final MockScanEntry config;
+  private final ColumnDef fields[];
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+
+  public ExtendedMockBatchReader(MockScanEntry config) {
+    this.config = config;
+    fields = buildColumnDefs();
+  }
+
+  private ColumnDef[] buildColumnDefs() {
+    final List<ColumnDef> defs = new ArrayList<>();
+
+    // Look for duplicate names. Bad things happen when the same name
+    // appears twice. We must do this here because some tests create
+    // a physical plan directly, meaning that this is the first
+    // opportunity to review the column definitions.
+
+    final Set<String> names = new HashSet<>();
+    final MockColumn cols[] = config.getTypes();
+    for (int i = 0; i < cols.length; i++) {
+      final MockTableDef.MockColumn col = cols[i];
+      if (names.contains(col.name)) {
+        throw new IllegalArgumentException("Duplicate column name: " + col.name);
+      }
+      names.add(col.name);
+      final int repeat = Math.max(1, col.getRepeatCount());
+      if (repeat == 1) {
+        defs.add(new ColumnDef(col));
+      } else {
+        for (int j = 0; j < repeat; j++) {
+          defs.add(new ColumnDef(col, j+1));
+        }
+      }
+    }
+    final ColumnDef[] defArray = new ColumnDef[defs.size()];
+    defs.toArray(defArray);
+    return defArray;
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator schemaNegotiator) {
+    final TupleMetadata schema = new TupleSchema();
+    for (int i = 0; i < fields.length; i++) {
+      final ColumnDef col = fields[i];
+      final MaterializedField field = MaterializedField.create(col.getName(),
+                                          col.getConfig().getMajorType());
+      schema.add(field);
+    }
+    schemaNegotiator.setTableSchema(schema, true);
+
+    // Set the batch size. Ideally, we'd leave that to the framework based
+    // on the bytes per batch. But, several legacy tests depend on a known,
+    // fixed batch size of 10K, so encode that until we can change those
+    // tests. If the operator definition specifies a size, use that.
+
+    // TODO: Defer batch size to framework, update tests accordingly.
+
+    final int batchSize = config.getBatchSize();
+    if (batchSize > 0) {
+      schemaNegotiator.setBatchSize(batchSize);
+    }
+
+    loader = schemaNegotiator.build();
+    writer = loader.writer();
+    for (int i = 0; i < fields.length; i++) {
+      fields[i].generator.setup(fields[i], writer.scalar(i));
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    final int rowCount = config.getRecords() - loader.totalRowCount();
+    if (rowCount <= 0) {
+      return false;
+    }
+
+    final Random rand = new Random();
+    for (int i = 0; i < rowCount; i++) {
+      if (writer.isFull()) {
+        break;
+      }
+      writer.start();
+      for (int j = 0; j < fields.length; j++) {
+        if (fields[j].nullable && rand.nextInt(100) < fields[j].nullablePercent) {
+          writer.scalar(j).setNull();
+        } else {
+          fields[j].generator.setValue();
+        }
+      }
+      writer.save();
+    }
+
+    return true;
+  }
+
+  @Override
+  public void close() { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
deleted file mode 100644
index 8116b7f..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mock;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.mock.MockTableDef.MockColumn;
-import org.apache.drill.exec.store.mock.MockTableDef.MockScanEntry;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-
-/**
- * Extended form of the mock record reader that uses generator class
- * instances to create the mock values. This is a work in progress.
- * Generators exist for a few simple required types. One also exists
- * to generate strings that contain dates.
- * <p>
- * The definition is provided inside the sub scan used to create the
- * {@link ScanBatch} used to create this record reader.
- */
-
-public class ExtendedMockRecordReader extends AbstractRecordReader {
-
-  private ValueVector[] valueVectors;
-  private int batchRecordCount;
-  private int recordsRead;
-
-  private final MockScanEntry config;
-  private final ColumnDef fields[];
-
-  public ExtendedMockRecordReader(MockScanEntry config) {
-    this.config = config;
-
-    fields = buildColumnDefs();
-  }
-
-  private ColumnDef[] buildColumnDefs() {
-    List<ColumnDef> defs = new ArrayList<>();
-
-    // Look for duplicate names. Bad things happen when the same name
-    // appears twice. We must do this here because some tests create
-    // a physical plan directly, meaning that this is the first
-    // opportunity to review the column definitions.
-
-    Set<String> names = new HashSet<>();
-    MockColumn cols[] = config.getTypes();
-    for (int i = 0; i < cols.length; i++) {
-      MockTableDef.MockColumn col = cols[i];
-      if (names.contains(col.name)) {
-        throw new IllegalArgumentException("Duplicate column name: " + col.name);
-      }
-      names.add(col.name);
-      int repeat = Math.min(1, col.getRepeatCount());
-      if (repeat == 1) {
-        defs.add(new ColumnDef(col));
-      } else {
-        for (int j = 0; j < repeat; j++) {
-          defs.add(new ColumnDef(col, j+1));
-        }
-      }
-    }
-    ColumnDef[] defArray = new ColumnDef[defs.size()];
-    defs.toArray(defArray);
-    return defArray;
-  }
-
-  private int getEstimatedRecordSize() {
-    int size = 0;
-    for (int i = 0; i < fields.length; i++) {
-      size += fields[i].width;
-    }
-    return size;
-  }
-
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
-    try {
-      final int estimateRowSize = getEstimatedRecordSize();
-      valueVectors = new ValueVector[fields.length];
-      int batchSize = config.getBatchSize();
-      if (batchSize == 0) {
-        batchSize = 10 * 1024 * 1024;
-      }
-      batchRecordCount = Math.max(1, batchSize / estimateRowSize);
-      batchRecordCount = Math.min(batchRecordCount, Character.MAX_VALUE);
-
-      for (int i = 0; i < fields.length; i++) {
-        final ColumnDef col = fields[i];
-        final MajorType type = col.getConfig().getMajorType();
-        final MaterializedField field = MaterializedField.create(col.getName(), type);
-        final Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
-        valueVectors[i] = output.addField(field, vvClass);
-      }
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException("Failure while setting up fields", e);
-    }
-  }
-
-  @Override
-  public int next() {
-    if (recordsRead >= this.config.getRecords()) {
-      return 0;
-    }
-
-    final int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead);
-    recordsRead += recordSetSize;
-    for (int i = 0; i < recordSetSize; i++) {
-      int j = 0;
-      for (final ValueVector v : valueVectors) {
-        fields[j++].generator.setValue(v, i);
-      }
-    }
-
-    return recordSetSize;
-  }
-
-  @Override
-  public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
-    try {
-      for (final ValueVector v : vectorMap.values()) {
-        AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
-      }
-    } catch (NullPointerException e) {
-      throw new OutOfMemoryException();
-    }
-  }
-
-  @Override
-  public void close() { }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/FieldGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/FieldGen.java
index b51077f..813b235 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/FieldGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/FieldGen.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
 /**
  * Interface which all mock column data generators must
@@ -32,6 +32,6 @@ import org.apache.drill.exec.vector.ValueVector;
  * of such casts.)
  */
 public interface FieldGen {
-  void setup(ColumnDef colDef);
-  void setValue(ValueVector v, int index);
+  void setup(ColumnDef colDef, ScalarWriter colLoader);
+  void setValue();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/IntGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/IntGen.java
index be00541..6d5730e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/IntGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/IntGen.java
@@ -17,31 +17,16 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import java.util.Random;
-
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-
 /**
  * Generates integer values uniformly randomly distributed over
  * the entire 32-bit integer range from
  * {@link Integer.MIN_VALUE} to {@link Integer.MAX_VALUE}.
  */
 
-public class IntGen implements FieldGen {
-
-  private final Random rand = new Random();
-
-  @Override
-  public void setup(ColumnDef colDef) { }
-
-  private int value() {
-    return rand.nextInt();
-  }
+public class IntGen extends AbstractFieldGen {
 
   @Override
-  public void setValue(ValueVector v, int index) {
-    IntVector vector = (IntVector) v;
-    vector.getMutator().set(index, value());
+  public void setValue() {
+    colWriter.setInt(rand.nextInt());
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 2ed81c2..cfd3a89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -21,29 +21,83 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
-
 import org.apache.drill.exec.store.mock.MockTableDef.MockScanEntry;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
+
   @Override
-  public ScanBatch getBatch(ExecutorFragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     final List<MockScanEntry> entries = config.getReadEntries();
+    MockScanEntry first = entries.get(0);
+    if (first.isExtended()) {
+      // Extended mode: use the revised, size-aware scan operator
+
+      return extendedMockScan(context, config, entries);
+    } else {
+      return legacyMockScan(context, config, entries);
+    }
+  }
+
+  private CloseableRecordBatch extendedMockScan(FragmentContext context,
+      MockSubScanPOP config, List<MockScanEntry> entries) {
+    List<SchemaPath> projList = new LinkedList<>();
+    projList.add(SchemaPath.STAR_COLUMN);
+
+    // Create batch readers up front. Handy when we know there are
+    // only one or two; else use an iterator and create them on the fly.
+
+    final List<ManagedReader<SchemaNegotiator>> readers = new LinkedList<>();
+    for (final MockTableDef.MockScanEntry e : entries) {
+      readers.add(new ExtendedMockBatchReader(e));
+    }
+
+    // Limit the batch size to 10 MB, or whatever the operator definition
+    // specified.
+
+    int batchSizeBytes = 10 * 1024 * 1024;
+    MockTableDef.MockScanEntry first = entries.get(0);
+    if (first.getBatchSize() > 0) {
+      batchSizeBytes = first.getBatchSize();
+    }
+
+    // Set the scan to allow the maximum row count, allowing
+    // each reader to adjust the batch size smaller if desired.
+
+    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+    builder.setBatchByteLimit(batchSizeBytes);
+    builder.setProjection(projList);
+    builder.setReaderFactory(new BasicScanFactory(readers.iterator()));
+    ManagedScanFramework framework = new ManagedScanFramework(builder);
+
+    return new OperatorRecordBatch(
+         context, config,
+         new ScanOperatorExec(framework, false), false);
+  }
+
+  private CloseableRecordBatch legacyMockScan(FragmentContext context,
+      MockSubScanPOP config,
+      List<MockScanEntry> entries) throws ExecutionSetupException {
     final List<RecordReader> readers = new LinkedList<>();
-    for(final MockTableDef.MockScanEntry e : entries) {
-      if ( e.isExtended( ) ) {
-        readers.add(new ExtendedMockRecordReader(e));
-      } else {
-        readers.add(new MockRecordReader(context, e));
-      }
+    for (final MockTableDef.MockScanEntry e : entries) {
+      readers.add(new MockRecordReader(context, e));
     }
     return new ScanBatch(config, context, readers);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
index 9fee5c7..a723aca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
@@ -68,6 +68,4 @@ public class MockStorePOP extends AbstractStore {
   public int getOperatorType() {
     throw new UnsupportedOperationException();
   }
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockTableDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockTableDef.java
index 81f92b1..1b4af74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockTableDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockTableDef.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("mock-table")
 public class MockTableDef {
+
   /**
    * Describes one simulated file (or block) within the logical file scan
    * described by this group scan. Each block can have a distinct schema to test
@@ -171,7 +172,8 @@ public class MockTableDef {
         b.setPrecision(precision);
       }
       if (width != null) {
-        b.setWidth(width);
+        //b.setWidth(width); // Legacy
+        b.setPrecision(width); // Since DRILL-5419
       }
       if (scale != null) {
         b.setScale(scale);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MoneyGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MoneyGen.java
index d4e2379..bb046e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MoneyGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MoneyGen.java
@@ -17,11 +17,6 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import java.util.Random;
-
-import org.apache.drill.exec.vector.Float8Vector;
-import org.apache.drill.exec.vector.ValueVector;
-
 /**
  * Generates a mock money field as a double over the range 0
  * to 1 million. Values include cents. That is the value
@@ -29,20 +24,11 @@ import org.apache.drill.exec.vector.ValueVector;
  * 999,999.99.
  */
 
-public class MoneyGen implements FieldGen {
-
-  private final Random rand = new Random();
-
-  @Override
-  public void setup(ColumnDef colDef) { }
-
-  private double value() {
-    return Math.ceil(rand.nextDouble() * 1_000_000 * 100) / 100;
-  }
+public class MoneyGen extends AbstractFieldGen {
 
   @Override
-  public void setValue(ValueVector v, int index) {
-    Float8Vector vector = (Float8Vector) v;
-    vector.getMutator().set(index, value());
+  public void setValue() {
+    double value = Math.ceil(rand.nextDouble() * 1_000_000 * 100) / 100;
+    colWriter.setDouble(value);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/StringGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/StringGen.java
index 72be10f..eb9302d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/StringGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/StringGen.java
@@ -17,10 +17,7 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import java.util.Random;
-
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
 /**
  * Generates a mock string field of the given length. Fields are composed
@@ -29,13 +26,13 @@ import org.apache.drill.exec.vector.VarCharVector;
  * DDDD, MMMM, AAAA, RRRR, ...
  */
 
-public class StringGen implements FieldGen {
+public class StringGen extends AbstractFieldGen {
 
-  private final Random rand = new Random();
   private int length;
 
   @Override
-  public void setup(ColumnDef colDef) {
+  public void setup(ColumnDef colDef, ScalarWriter colLoader) {
+    super.setup(colDef, colLoader);
     length = colDef.width;
   }
 
@@ -49,8 +46,7 @@ public class StringGen implements FieldGen {
   }
 
   @Override
-  public void setValue(ValueVector v, int index) {
-    VarCharVector vector = (VarCharVector) v;
-    vector.getMutator().setSafe(index, value().getBytes());
+  public void setValue() {
+    colWriter.setString(value());
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/VaryingStringGen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/VaryingStringGen.java
index bf0dec7..0691388 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/VaryingStringGen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/VaryingStringGen.java
@@ -20,10 +20,10 @@ package org.apache.drill.exec.store.mock;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
-public class VaryingStringGen implements FieldGen {
+
+public class VaryingStringGen extends AbstractFieldGen {
 
   private Random rand = new Random();
   private int length;
@@ -32,7 +32,8 @@ public class VaryingStringGen implements FieldGen {
   private int valueCount;
 
   @Override
-  public void setup(ColumnDef colDef) {
+  public void setup(ColumnDef colDef, ScalarWriter colLoader) {
+    super.setup(colDef, colLoader);
     length = colDef.width;
     Map<String,Object> props = colDef.mockCol.properties;
     span = 1000;
@@ -63,8 +64,7 @@ public class VaryingStringGen implements FieldGen {
   }
 
   @Override
-  public void setValue(ValueVector v, int index) {
-    VarCharVector vector = (VarCharVector) v;
-    vector.getMutator().setSafe(index, value().getBytes());
+  public void setValue() {
+    colWriter.setString(value());
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
index c03a469..66e12f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
@@ -39,10 +39,10 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
 import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.ops.FragmentContextImpl;
-import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.proto.BitControl;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.Drillbit;
@@ -51,12 +51,11 @@ import org.apache.drill.exec.store.mock.MockScanBatchCreator;
 import org.apache.drill.exec.store.mock.MockSubScanPOP;
 import org.apache.drill.exec.store.mock.MockTableDef;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.joda.time.DateTimeZone;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
 @Category({SlowTest.class, SqlTest.class})
 public class ExpressionInterpreterTest  extends PopUnitTestBase {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionInterpreterTest.class);
@@ -173,7 +172,7 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
     final MockTableDef.MockScanEntry entry = new MockTableDef.MockScanEntry(10, false, 0, 1, columns);
     final MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", false, java.util.Collections.singletonList(entry));
 
-    final ScanBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
+    final CloseableRecordBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
 
     batch.next();
 
@@ -192,7 +191,7 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
     bit1.close();
   }
 
-  private ScanBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
+  private CloseableRecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
     final List<RecordBatch> children = Lists.newArrayList();
     final MockScanBatchCreator creator = new MockScanBatchCreator();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index aac93ea..376ac03 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -17,7 +17,15 @@
  */
 package org.apache.drill.exec.physical.impl.agg;
 
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
 import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
@@ -30,74 +38,73 @@ import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.ProfileParser;
 import org.apache.drill.test.QueryBuilder;
-import org.apache.drill.categories.SlowTest;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.List;
-
-import static junit.framework.TestCase.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Test spilling for the Hash Aggr operator (using the mock reader)
  */
 @Category({SlowTest.class, OperatorTest.class})
 public class TestHashAggrSpill extends DrillTest {
 
+  // Matches the "2400K" in the table name.
+  public static final int DEFAULT_ROW_COUNT = 2_400_000;
+
   @Rule
   public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
 
-    /**
-     *  A template for Hash Aggr spilling tests
-     *
-     * @throws Exception
-     */
-    private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback, boolean predict,
-                           String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception {
-        ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
-          .sessionOption(ExecConstants.HASHAGG_MAX_MEMORY_KEY,maxMem)
-          .sessionOption(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY,numPartitions)
-          .sessionOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,minBatches)
-          .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
-          .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
-          .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, fallback)
-          .sessionOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_KEY,predict)
-          .maxParallelization(maxParallel)
-          .saveProfiles();
-        String sqlStr = sql != null ? sql :  // if null then use this default query
-          "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
-
-        try (ClusterFixture cluster = builder.build();
-             ClientFixture client = cluster.clientFixture()) {
-          runAndDump(client, sqlStr, expectedRows, cycle, fromPart, toPart);
-        }
-    }
-    /**
-     * Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
-     * ("normal spill" means spill-cycle = 1 )
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testSimpleHashAggrSpill() throws Exception {
-        testSpill(68_000_000, 16, 2, 2, false, true, null,
-          1_200_000, 1,2, 3
-          );
-    }
-    /**
-     * Test with "needed memory" prediction turned off
-     * (i.e., do exercise code paths that catch OOMs from the Hash Table and recover)
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testNoPredictHashAggrSpill() throws Exception {
-      testSpill(58_000_000, 16, 2, 2, false, false /* no prediction */, null,
-        1_200_000, 1, 1, 1);
+  /**
+   *  A template for Hash Aggr spilling tests
+   *
+   * @throws Exception
+   */
+  private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback, boolean predict,
+                         String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+      .sessionOption(ExecConstants.HASHAGG_MAX_MEMORY_KEY,maxMem)
+      .sessionOption(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY,numPartitions)
+      .sessionOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,minBatches)
+      .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
+      .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+      .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, fallback)
+      .sessionOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_KEY,predict)
+      .maxParallelization(maxParallel)
+      .saveProfiles();
+    String sqlStr = sql != null ? sql :  // if null then use this default query
+      "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_2400K` GROUP BY empid_s17, dept_i, branch_i";
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      runAndDump(client, sqlStr, expectedRows, cycle, fromPart, toPart);
     }
+  }
+
+  /**
+   * Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
+   * ("normal spill" means spill-cycle = 1 )
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleHashAggrSpill() throws Exception {
+    testSpill(68_000_000, 16, 2, 2, false, true, null,
+        DEFAULT_ROW_COUNT, 1,2, 3);
+  }
+
+  /**
+   * Test with "needed memory" prediction turned off
+   * (i.e., exercise code paths that catch OOMs from the Hash Table and recover)
+   *
+   * @throws Exception
+   */
+  @Test
+  @Ignore("DRILL-7301")
+  public void testNoPredictHashAggrSpill() throws Exception {
+    testSpill(135_000_000, 16, 2, 2, false, false /* no prediction */, null,
+        DEFAULT_ROW_COUNT, 1, 1, 1);
+  }
 
   private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long fromSpilledPartitions, long toSpilledPartitions) throws Exception {
     QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
@@ -108,13 +115,14 @@ public class TestHashAggrSpill extends DrillTest {
     ProfileParser profile = client.parseProfile(summary.queryIdString());
     List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.HASH_AGGREGATE_VALUE);
 
-    assertTrue(!ops.isEmpty());
+    assertFalse(ops.isEmpty());
     // check for the first op only
     ProfileParser.OperatorProfile hag0 = ops.get(0);
     long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
     assertEquals(spillCycle, opCycle);
     long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
-    assertTrue(op_spilled_partitions >= fromSpilledPartitions && op_spilled_partitions <= toSpilledPartitions);
+    assertTrue(op_spilled_partitions >= fromSpilledPartitions);
+    assertTrue(op_spilled_partitions <= toSpilledPartitions);
   }
 
   /**
@@ -125,8 +133,9 @@ public class TestHashAggrSpill extends DrillTest {
   @Test
   public void testHashAggrSecondaryTertiarySpill() throws Exception {
 
-    testSpill(58_000_000, 16, 3, 1, false, true, "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i",
-      1_100_000, 3, 2, 2);
+    testSpill(58_000_000, 16, 3, 1, false, true,
+        "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i",
+        1_100_000, 3, 2, 2);
   }
 
   /**
@@ -139,7 +148,7 @@ public class TestHashAggrSpill extends DrillTest {
 
     try {
       testSpill(34_000_000, 4, 5, 2, false /* no fallback */, true, null,
-        1_200_000, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
+          DEFAULT_ROW_COUNT, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
       fail(); // in case the above test did not throw
     } catch (Exception ex) {
       assertTrue(ex instanceof UserRemoteException);
@@ -157,6 +166,6 @@ public class TestHashAggrSpill extends DrillTest {
   @Test
   public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
     testSpill(34_000_000, 4, 5, 2, true /* do fallback */, true, null,
-      1_200_000, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
+        DEFAULT_ROW_COUNT, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
index 8910907..1ec75c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
@@ -148,7 +148,7 @@ public class BaseScanOperatorExecTest extends SubOperatorTest {
   public static class BaseScanFixtureBuilder extends ScanFixtureBuilder {
 
     public ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
-    public final List<ManagedReader<? extends SchemaNegotiator>> readers = new ArrayList<>();
+    public final List<ManagedReader<SchemaNegotiator>> readers = new ArrayList<>();
 
     public BaseScanFixtureBuilder() {
       super(fixture);
@@ -157,10 +157,21 @@ public class BaseScanOperatorExecTest extends SubOperatorTest {
     @Override
     public ScanFrameworkBuilder builder() { return builder; }
 
-    public void addReader(ManagedReader<? extends SchemaNegotiator> reader) {
+    public void addReader(ManagedReader<SchemaNegotiator> reader) {
       readers.add(reader);
     }
 
+    public void addReaders(List<ManagedReader<SchemaNegotiator>> readers) {
+      this.readers.addAll(readers);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void addReaders(ManagedReader<SchemaNegotiator>...readers) {
+      for (ManagedReader<SchemaNegotiator> reader : readers) {
+        addReader(reader);
+      }
+    }
+
     @Override
     public ScanFixture build() {
       builder.setReaderFactory(new BasicScanFactory(readers.iterator()));
@@ -169,17 +180,15 @@ public class BaseScanOperatorExecTest extends SubOperatorTest {
   }
 
   @SafeVarargs
-  public static BaseScanFixtureBuilder simpleBuilder(ManagedReader<? extends SchemaNegotiator>...readers) {
+  public static BaseScanFixtureBuilder simpleBuilder(ManagedReader<SchemaNegotiator>...readers) {
     BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
     builder.projectAll();
-    for (ManagedReader<? extends SchemaNegotiator> reader : readers) {
-      builder.addReader(reader);
-    }
+    builder.addReaders(readers);
     return builder;
   }
 
   @SafeVarargs
-  public static ScanFixture simpleFixture(ManagedReader<? extends SchemaNegotiator>...readers) {
+  public static ScanFixture simpleFixture(ManagedReader<SchemaNegotiator>...readers) {
     return simpleBuilder(readers).build();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
new file mode 100644
index 0000000..b8e7a33
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.QueryRowSetIterator;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the mock data source. See the package info file in the
+ * mock data source for details.
+ * <p>
+ * This is mostly just a sanity tests: details are added, and
+ * tested, where needed in unit tests.
+ */
+
+@Category(RowSetTests.class)
+public class TestMockPlugin extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(
+        ClusterFixture.builder(dirTestWatcher));
+  }
+
+  @Test
+  public void testRowLimit() throws RpcException {
+    String sql = "SELECT dept_i FROM `mock`.`employee_100`";
+    RowSet result = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata schema = result.schema();
+    assertEquals(1, schema.size());
+    ColumnMetadata col = schema.metadata(0);
+    assertEquals("dept_i", col.name());
+    assertEquals(MinorType.INT, col.type());
+    assertEquals(DataMode.REQUIRED, col.mode());
+    assertEquals(100, result.rowCount());
+    result.clear();
+  }
+
+  @Test
+  public void testVarChar() throws RpcException {
+    String sql = "SELECT name_s17 FROM `mock`.`employee_100`";
+    RowSet result = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata schema = result.schema();
+    assertEquals(1, schema.size());
+    ColumnMetadata col = schema.metadata(0);
+    assertEquals("name_s17", col.name());
+    assertEquals(MinorType.VARCHAR, col.type());
+    assertEquals(DataMode.REQUIRED, col.mode());
+    assertEquals(100, result.rowCount());
+
+    RowSetReader reader = result.reader();
+    while (reader.next()) {
+      assertEquals(17, reader.scalar(0).getString().length());
+    }
+    result.clear();
+  }
+
+  @Test
+  public void testDouble() throws RpcException {
+    String sql = "SELECT balance_d FROM `mock`.`employee_100`";
+    RowSet result = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata schema = result.schema();
+    assertEquals(1, schema.size());
+    ColumnMetadata col = schema.metadata(0);
+    assertEquals("balance_d", col.name());
+    assertEquals(MinorType.FLOAT8, col.type());
+    assertEquals(DataMode.REQUIRED, col.mode());
+    assertEquals(100, result.rowCount());
+    result.clear();
+  }
+
+  @Test
+  public void testBoolean() throws RpcException {
+    String sql = "SELECT active_b FROM `mock`.`employee_100`";
+    RowSet result = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata schema = result.schema();
+    assertEquals(1, schema.size());
+    ColumnMetadata col = schema.metadata(0);
+    assertEquals("active_b", col.name());
+    assertEquals(MinorType.BIT, col.type());
+    assertEquals(DataMode.REQUIRED, col.mode());
+    assertEquals(100, result.rowCount());
+    result.clear();
+  }
+
+  /**
+   * By default, the mock reader limits batch size to 10 MB.
+   */
+  @Test
+  public void testSizeLimit() throws RpcException {
+    String sql = "SELECT comments_s20000 FROM `mock`.`employee_1K`";
+    QueryRowSetIterator iter = client.queryBuilder().sql(sql).rowSetIterator();
+
+    assertTrue(iter.hasNext());
+    RowSet result = iter.next();
+    TupleMetadata schema = result.schema();
+    assertEquals(1, schema.size());
+    ColumnMetadata col = schema.metadata(0);
+    assertEquals("comments_s20000", col.name());
+    assertEquals(MinorType.VARCHAR, col.type());
+    assertEquals(DataMode.REQUIRED, col.mode());
+    assertTrue(result.rowCount() <= 10 * 1024 * 1024 / 20_000);
+    result.clear();
+    while (iter.hasNext()) {
+      iter.next().clear();
+    }
+  }
+
+
+  @Test
+  public void testExtendedSqlMultiBatch() throws Exception {
+    String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K`";
+
+    QuerySummary results = client.queryBuilder().sql(sql).run();
+    assertEquals(10_000, results.recordCount());
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
new file mode 100644
index 0000000..c7dd6b1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.BaseScanOperatorExecTest.BaseScanFixtureBuilder;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests the mock data source directly by wrapping it in a mock
+ * scan operator, without the rest of Drill.
+ */
+
+@Category(RowSetTests.class)
+public class TestMockRowReader extends SubOperatorTest {
+
+  private static ScanFixture buildScan(MockSubScanPOP config, List<ManagedReader<SchemaNegotiator>> readers) {
+    BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
+    List<SchemaPath> projList = new ArrayList<>();
+    projList.add(SchemaPath.STAR_COLUMN);
+    builder.setProjection(projList);
+    builder.addReaders(readers);
+    return builder.build();
+  }
+
+  /**
+   * Test the most basic case: required integers and strings.
+   */
+
+  @Test
+  public void testBasics() {
+    int rowCount = 10;
+    MockTableDef.MockColumn cols[] = new MockTableDef.MockColumn[] {
+        new MockTableDef.MockColumn("a", MinorType.INT, DataMode.REQUIRED, null, null, null, null, null, null ),
+        new MockTableDef.MockColumn("b", MinorType.VARCHAR, DataMode.REQUIRED, 10, null, null, null, null, null )
+    };
+    MockTableDef.MockScanEntry entry = new MockTableDef.MockScanEntry(rowCount, true, null, null, cols);
+    MockSubScanPOP config = new MockSubScanPOP("dummy", true, Collections.singletonList(entry));
+
+    ManagedReader<SchemaNegotiator> reader = new ExtendedMockBatchReader(entry);
+    List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(reader);
+
+    // Create options and the scan operator
+
+    ScanFixture mockBatch = buildScan(config, readers);
+    ScanOperatorExec scan = mockBatch.scanOp;
+
+    // First batch: build schema. The reader helps: it returns an
+    // empty first batch.
+
+    assertTrue(scan.buildSchema());
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR, 10) // Width is reflected in meta-data
+        .buildSchema();
+    BatchSchema expectedBatchSchema = new BatchSchema(SelectionVectorMode.NONE, expectedSchema.toFieldList());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
+    assertEquals(0, scan.batchAccessor().getRowCount());
+    scan.batchAccessor().release();
+
+    // Next call, return with data.
+
+    assertTrue(scan.next());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
+    assertEquals(rowCount, scan.batchAccessor().getRowCount());
+    scan.batchAccessor().release();
+
+    // EOF
+
+    assertFalse(scan.next());
+    mockBatch.close();
+  }
+
+  /**
+   * Verify that the mock reader can generate nullable (optional) columns,
+   * including filling values with nulls at some percentage, 25% by
+   * default.
+   */
+
+  @Test
+  public void testOptional() {
+    int rowCount = 10;
+    Map<String,Object> props = new HashMap<>();
+    props.put("nulls", 50);
+    MockTableDef.MockColumn cols[] = new MockTableDef.MockColumn[] {
+        new MockTableDef.MockColumn("a", MinorType.INT, DataMode.OPTIONAL, null, null, null, null, null, null ),
+        new MockTableDef.MockColumn("b", MinorType.VARCHAR, DataMode.OPTIONAL, 10, null, null, null, null, props )
+    };
+    MockTableDef.MockScanEntry entry = new MockTableDef.MockScanEntry(rowCount, true, null, null, cols);
+    MockSubScanPOP config = new MockSubScanPOP("dummy", true, Collections.singletonList(entry));
+    ManagedReader<SchemaNegotiator> reader = new ExtendedMockBatchReader(entry);
+    List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(reader);
+
+    // Create options and the scan operator
+
+    ScanFixture mockBatch = buildScan(config, readers);
+    ScanOperatorExec scan = mockBatch.scanOp;
+
+    // First batch: build schema. The reader helps: it returns an
+    // empty first batch.
+
+    assertTrue(scan.buildSchema());
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR, 10)
+        .build();
+    BatchSchema expectedBatchSchema = new BatchSchema(SelectionVectorMode.NONE, expectedSchema.toFieldList());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
+    assertEquals(0, scan.batchAccessor().getRowCount());
+
+    // Next call, return with data.
+
+    assertTrue(scan.next());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
+    assertEquals(rowCount, scan.batchAccessor().getRowCount());
+    scan.batchAccessor().release();
+
+    // EOF
+
+    assertFalse(scan.next());
+    mockBatch.close();
+  }
+
+  /**
+   * Test a repeated column.
+   */
+
+  @Test
+  public void testColumnRepeat() {
+    int rowCount = 10;
+    MockTableDef.MockColumn cols[] = new MockTableDef.MockColumn[] {
+        new MockTableDef.MockColumn("a", MinorType.INT, DataMode.REQUIRED, null, null, null, null, 3, null ),
+        new MockTableDef.MockColumn("b", MinorType.VARCHAR, DataMode.REQUIRED, 10, null, null, null, null, null )
+    };
+    MockTableDef.MockScanEntry entry = new MockTableDef.MockScanEntry(rowCount, true, null, null, cols);
+    MockSubScanPOP config = new MockSubScanPOP("dummy", true, Collections.singletonList(entry));
+
+    ManagedReader<SchemaNegotiator> reader = new ExtendedMockBatchReader(entry);
+     List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(reader);
+
+    // Create options and the scan operator
+
+    ScanFixture mockBatch = buildScan(config, readers);
+    ScanOperatorExec scan = mockBatch.scanOp;
+
+    // First batch: build schema. The reader helps: it returns an
+    // empty first batch.
+
+    assertTrue(scan.buildSchema());
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a1", MinorType.INT)
+        .add("a2", MinorType.INT)
+        .add("a3", MinorType.INT)
+        .add("b", MinorType.VARCHAR, 10)
+        .build();
+    BatchSchema expectedBatchSchema = new BatchSchema(SelectionVectorMode.NONE, expectedSchema.toFieldList());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
+    assertEquals(0, scan.batchAccessor().getRowCount());
+
+    // Next call, return with data.
+
+    assertTrue(scan.next());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
+    assertEquals(rowCount, scan.batchAccessor().getRowCount());
+    scan.batchAccessor().release();
+
+    // EOF
+
+    assertFalse(scan.next());
+    mockBatch.close();
+  }
+
+  /**
+   * Verify limit on individual batch size (limiting row count per batch).
+   */
+
+  @Test
+  public void testBatchSize() {
+    int rowCount = 20;
+    int batchSize = 10;
+    MockTableDef.MockColumn cols[] = new MockTableDef.MockColumn[] {
+        new MockTableDef.MockColumn("a", MinorType.INT, DataMode.REQUIRED, null, null, null, null, null, null ),
+        new MockTableDef.MockColumn("b", MinorType.VARCHAR, DataMode.REQUIRED, 10, null, null, null, null, null )
+    };
+    MockTableDef.MockScanEntry entry = new MockTableDef.MockScanEntry(rowCount, true, batchSize, null, cols);
+    MockSubScanPOP config = new MockSubScanPOP("dummy", true, Collections.singletonList(entry));
+
+    ManagedReader<SchemaNegotiator> reader = new ExtendedMockBatchReader(entry);
+    List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(reader);
+
+    // Create options and the scan operator
+
+    ScanFixture mockBatch = buildScan(config, readers);
+    ScanOperatorExec scan = mockBatch.scanOp;
+
+    // First batch: build schema. The reader helps: it returns an
+    // empty first batch.
+
+    assertTrue(scan.buildSchema());
+    assertEquals(0, scan.batchAccessor().getRowCount());
+
+    // Next call, return with data, limited by batch size.
+
+    assertTrue(scan.next());
+    assertEquals(batchSize, scan.batchAccessor().getRowCount());
+    scan.batchAccessor().release();
+
+    assertTrue(scan.next());
+    assertEquals(batchSize, scan.batchAccessor().getRowCount());
+    scan.batchAccessor().release();
+
+    // EOF
+
+    assertFalse(scan.next());
+    mockBatch.close();
+  }
+
+  /**
+   * Test a mock varchar column large enough to cause vector overflow.
+   */
+
+  @Test
+  public void testOverflow() {
+    int rowCount = ValueVector.MAX_ROW_COUNT;
+    MockTableDef.MockColumn cols[] = new MockTableDef.MockColumn[] {
+        new MockTableDef.MockColumn("a", MinorType.INT, DataMode.REQUIRED, null, null, null, null, null, null ),
+        new MockTableDef.MockColumn("b", MinorType.VARCHAR, DataMode.REQUIRED, 1000, null, null, null, null, null )
+    };
+    MockTableDef.MockScanEntry entry = new MockTableDef.MockScanEntry(rowCount, true, null, null, cols);
+    MockSubScanPOP config = new MockSubScanPOP("dummy", true, Collections.singletonList(entry));
+
+    ManagedReader<SchemaNegotiator> reader = new ExtendedMockBatchReader(entry);
+    @SuppressWarnings("unchecked")
+    List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(reader);
+
+    // Create options and the scan operator
+
+    ScanFixture mockBatch = buildScan(config, readers);
+    ScanOperatorExec scan = mockBatch.scanOp;
+
+    // First batch: build schema. The reader helps: it returns an
+    // empty first batch.
+
+    assertTrue(scan.buildSchema());
+    assertEquals(0, scan.batchAccessor().getRowCount());
+
+    // Next call, return with data, limited by batch size.
+
+    int totalRowCount = 0;
+    int batchCount = 0;
+    while(scan.next()) {
+      assertTrue(scan.batchAccessor().getRowCount() < ValueVector.MAX_ROW_COUNT);
+      BatchAccessor batchAccessor = scan.batchAccessor();
+      totalRowCount += batchAccessor.getRowCount();
+      batchCount++;
+      batchAccessor.release();
+    }
+
+    assertEquals(ValueVector.MAX_ROW_COUNT, totalRowCount);
+    assertTrue(batchCount > 1);
+
+    mockBatch.close();
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
index 5d9fcba..96eb863 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
@@ -194,7 +194,14 @@ public class SchemaBuilder implements SchemaContainer {
     return tupleBuilder.addRepeatedList(this, name);
   }
 
+  // Retained for backward compatibility. build() used to create
+  // a batch schema; now can be used to create a TupleMetadata.
+
   public TupleMetadata buildSchema() {
+    return build();
+  }
+
+  public TupleMetadata build() {
     return tupleBuilder.schema();
   }
 }