You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/08/27 08:25:37 UTC

[drill] 06/06: DRILL-6461: Added basic data correctness tests for hash agg, and improved operator unit testing framework.

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d8f9fb6a5cf22a01fa3f48bd40e7dbeb3cb6e4e4
Author: Timothy Farkas <ti...@apache.org>
AuthorDate: Mon Jun 4 10:45:12 2018 -0700

    DRILL-6461: Added basic data correctness tests for hash agg, and improved operator unit testing framework.
    
    git closes #1344
---
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |  12 -
 .../physical/impl/common/HashTableTemplate.java    |  12 +-
 .../physical/impl/project/ProjectRecordBatch.java  |   2 +
 .../physical/impl/svremover/AbstractCopier.java    |  26 +-
 .../physical/impl/svremover/AbstractSV2Copier.java |   4 +-
 .../physical/impl/svremover/AbstractSV4Copier.java |   4 +-
 .../drill/exec/physical/impl/svremover/Copier.java |   4 +-
 .../physical/impl/svremover/GenericCopier.java     |   7 +-
 .../physical/impl/svremover/StraightCopier.java    |   3 +-
 .../apache/drill/exec/record/RecordBatchSizer.java |  17 ++
 .../physical/impl/BaseTestOpBatchEmitOutcome.java  |   2 +-
 .../drill/exec/physical/impl/MockRecordBatch.java  | 196 ++++++++++---
 .../physical/impl/agg/TestAggWithAnyValue.java     |   6 +-
 .../exec/physical/impl/agg/TestHashAggBatch.java   | 212 ++++++++++++++
 .../physical/impl/common/HashPartitionTest.java    |  70 +++--
 .../exec/physical/impl/join/TestHashJoinJPPD.java  |   6 +-
 .../exec/physical/impl/join/TestHashJoinSpill.java |   8 +-
 .../physical/impl/limit/TestLimitOperator.java     |  16 +-
 .../impl/svremover/AbstractGenericCopierTest.java  |  90 +++---
 .../physical/impl/svremover/GenericCopierTest.java |   4 +-
 .../impl/svremover/GenericSV2BatchCopierTest.java  |   4 +-
 .../impl/svremover/GenericSV2CopierTest.java       |   4 +-
 .../impl/svremover/GenericSV4CopierTest.java       |   9 +-
 .../physical/unit/BasicPhysicalOpUnitTest.java     |  30 +-
 .../exec/physical/unit/MiniPlanUnitTestBase.java   |   3 +-
 .../exec/physical/unit/TestOutputBatchSize.java    |  80 +++---
 .../columnreaders/TestBatchSizingMemoryUtil.java   |   2 +-
 .../drill/test/LegacyOperatorTestBuilder.java      | 178 ++++++++++++
 .../org/apache/drill/test/OperatorTestBuilder.java | 314 +++++++++++++++++++++
 .../apache/drill/test/OperatorTestBuilderTest.java | 157 +++++++++++
 .../unit => test}/PhysicalOpUnitTestBase.java      | 169 ++---------
 .../org/apache/drill/test/rowSet/RowSetBatch.java  | 111 --------
 .../apache/drill/test/rowSet/RowSetComparison.java | 151 +++++++---
 .../drill/test/rowSet/TestRowSetComparison.java    | 211 ++++++++++++++
 .../main/codegen/templates/FixedValueVectors.java  |   2 +-
 .../codegen/templates/NullableValueVectors.java    |   2 +-
 36 files changed, 1616 insertions(+), 512 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 2763f59..2e343b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -137,23 +137,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   @Override
   public void buildSchema() throws SchemaChangeException {
-    VectorContainer c = new VectorContainer(oContext);
     IterOutcome outcome = next(incoming);
     switch (outcome) {
       case OK:
       case OK_NEW_SCHEMA:
         for (VectorWrapper<?> w : incoming) {
-          // TODO: Not sure why the special handling for AbstractContainerVector is needed since creation of child
-          // vectors is taken care correctly if the field is retrieved from incoming vector and passed to it rather than
-          // creating a new Field instance just based on name and type.
-          @SuppressWarnings("resource")
-          ValueVector v = c.addOrGet(w.getField());
-          if (v instanceof AbstractContainerVector) {
-            w.getValueVector().makeTransferPair(v);
-            v.clear();
-          }
-        }
-        for (VectorWrapper<?> w : c) {
           @SuppressWarnings("resource")
           ValueVector v = container.addOrGet(w.getField());
           if (v instanceof AbstractContainerVector) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 3c418b9..958a0b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -545,6 +545,15 @@ public abstract class HashTableTemplate implements HashTable {
 
   @Override
   public void clear() {
+    clear(true);
+  }
+
+  private void clear(boolean close) {
+    if (close) {
+      // If we are closing, we need to clear the htContainerOrig as well.
+      htContainerOrig.clear();
+    }
+
     if (batchHolders != null) {
       for (BatchHolder bh : batchHolders) {
         bh.clear();
@@ -842,7 +851,7 @@ public abstract class HashTableTemplate implements HashTable {
    *
    */
   public void reset() {
-    this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
+    this.clear(false); // Clear all current batch holders and hash table (i.e. free their memory)
 
     freeIndex = 0; // all batch holders are gone
     // reallocate batch holders, and the hash table to the original size
@@ -852,6 +861,7 @@ public abstract class HashTableTemplate implements HashTable {
     totalIndexSize = 0;
     startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
   }
+
   public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) {
     incomingBuild = newIncoming;
     incomingProbe = newIncomingProbe;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index dd93325..b459e1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -329,6 +329,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       m.setValueCount(count);
     }
 
+    container.setRecordCount(count);
+
     if (complexWriters == null) {
       return;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
index 47ec1cb..a463519 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.svremover;
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
@@ -30,7 +30,7 @@ public abstract class AbstractCopier implements Copier {
   protected VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -43,15 +43,7 @@ public abstract class AbstractCopier implements Copier {
 
   @Override
   public int copyRecords(int index, int recordCount) {
-    for(VectorWrapper<?> out : outgoing){
-      TypeProtos.MajorType type = out.getField().getType();
-      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
-        out.getValueVector().allocateNew();
-      } else {
-        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
-      }
-    }
-
+    allocateOutgoing(outgoing, recordCount);
     return insertRecords(0, index, recordCount);
   }
 
@@ -91,4 +83,16 @@ public abstract class AbstractCopier implements Copier {
   public abstract void copyEntryIndirect(int inIndex, int outIndex);
 
   public abstract void copyEntry(int inIndex, int outIndex);
+
+  public static void allocateOutgoing(VectorContainer outgoing, int recordCount) {
+    for(VectorWrapper<?> out : outgoing) {
+      TypeProtos.MajorType type = out.getField().getType();
+
+      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+        out.getValueVector().allocateNew();
+      } else {
+        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index 68a0889..d273fd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -33,7 +33,7 @@ public abstract class AbstractSV2Copier extends AbstractCopier {
   protected List<TransferPair> transferPairs = new ArrayList<>();
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv2 = incoming.getSelectionVector2();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index 56e2586..970f970 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -30,7 +30,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier {
   private SelectionVector4 sv4;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv4 = incoming.getSelectionVector4();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 92dea70..f8934e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorAccessible;
 
 public interface Copier {
-  void setup(RecordBatch incoming, VectorContainer outgoing);
+  void setup(VectorAccessible incoming, VectorContainer outgoing);
   int copyRecords(int index, int recordCount);
   int appendRecord(int index);
   int appendRecords(int index, int recordCount);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
index 72516e0..f64a11e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -17,11 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import static org.apache.drill.exec.physical.impl.svremover.AbstractCopier.allocateOutgoing;
+
 public class GenericCopier implements Copier {
   private ValueVector[] vvOut;
   private ValueVector[] vvIn;
@@ -29,7 +31,7 @@ public class GenericCopier implements Copier {
   private VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -53,6 +55,7 @@ public class GenericCopier implements Copier {
 
   @Override
   public int copyRecords(int index, int recordCount) {
+    allocateOutgoing(outgoing, recordCount);
     return insertRecords(0, index, recordCount);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
index 33f2a96..cecfc5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.svremover;
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -39,7 +40,7 @@ public class StraightCopier implements Copier {
     }
 
     @Override
-    public void setup(RecordBatch incoming, VectorContainer outgoing) {
+    public void setup(VectorAccessible incoming, VectorContainer outgoing) {
       for(VectorWrapper<?> vv : incoming){
         TransferPair tp = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
         pairs.add(tp);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 83287ee..dac80a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.record;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.Map;
 
@@ -634,6 +636,11 @@ public class RecordBatchSizer {
   private Map<String, ColumnSize> columnSizes = CaseInsensitiveMap.newHashMap();
 
   /**
+   * This field is used by the convenience method {@link #columnsList()}.
+   */
+  private List<ColumnSize> columnSizesList = new ArrayList<>();
+
+  /**
    * Number of records (rows) in the batch.
    */
   private int rowCount;
@@ -715,6 +722,8 @@ public class RecordBatchSizer {
     for (VectorWrapper<?> vw : va) {
       ColumnSize colSize = measureColumn(vw.getValueVector(), "");
       columnSizes.put(vw.getField().getName(), colSize);
+      columnSizesList.add(colSize);
+      stdRowWidth += colSize.getStdDataSizePerEntry();
       netBatchSize += colSize.getTotalNetSize();
       maxSize = Math.max(maxSize, colSize.getTotalDataSize());
       if (colSize.metadata.isNullable()) {
@@ -885,6 +894,14 @@ public class RecordBatchSizer {
   public Map<String, ColumnSize> columns() { return columnSizes; }
 
   /**
+   * This is a convenience method to get the sizes of columns in the same order that the corresponding value vectors
+   * are stored within a {@link org.apache.drill.exec.record.VectorAccessible}.
+   * @return The sizes of columns in the same order that the corresponding value vectors are stored within a
+   * {@link org.apache.drill.exec.record.VectorAccessible}.
+   */
+  public List<ColumnSize> columnsList() { return columnSizesList; }
+
+  /**
    * Compute the "real" width of the row, taking into account each varchar column size
    * (historically capped at 50, and rounded up to power of 2 to match drill buf allocation)
    * and null marking columns.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index 4eaca2b..620a61c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl;
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index ed7af4c..f3ec7b0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -29,9 +30,17 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
 
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 public class MockRecordBatch implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordBatch.class);
@@ -39,41 +48,83 @@ public class MockRecordBatch implements CloseableRecordBatch {
   // These resources are owned by this RecordBatch
   protected VectorContainer container;
   protected SelectionVector2 sv2;
+  protected SelectionVector4 sv4;
   private int currentContainerIndex;
   private int currentOutcomeIndex;
   private boolean isDone;
   private boolean limitWithUnnest;
 
   // All the below resources are owned by caller
-  private final List<VectorContainer> allTestContainers;
-  private List<SelectionVector2> allTestContainersSv2;
+  private final List<RowSet> rowSets;
   private final List<IterOutcome> allOutcomes;
   private final FragmentContext context;
   protected final OperatorContext oContext;
   protected final BufferAllocator allocator;
 
-  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
-                         List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
-                         BatchSchema schema) {
+  private MockRecordBatch(@NotNull final FragmentContext context,
+                          @Nullable final OperatorContext oContext,
+                          @NotNull final List<RowSet> testRowSets,
+                          @NotNull final List<IterOutcome> iterOutcomes,
+                          @NotNull final BatchSchema schema,
+                          final boolean dummy) {
+    Preconditions.checkNotNull(testRowSets);
+    Preconditions.checkNotNull(iterOutcomes);
+    Preconditions.checkNotNull(schema);
+
     this.context = context;
     this.oContext = oContext;
-    this.allocator = oContext.getAllocator();
-    this.allTestContainers = testContainers;
+    this.rowSets = testRowSets;
+    this.allocator = context.getAllocator();
     this.container = new VectorContainer(allocator, schema);
     this.allOutcomes = iterOutcomes;
     this.currentContainerIndex = 0;
     this.currentOutcomeIndex = 0;
     this.isDone = false;
-    this.allTestContainersSv2 = null;
-    this.sv2 = null;
   }
 
-  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
-                         List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
-                         List<SelectionVector2> testContainersSv2, BatchSchema schema) {
-    this(context, oContext, testContainers, iterOutcomes, schema);
-    allTestContainersSv2 = testContainersSv2;
-    sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? new SelectionVector2(allocator) : null;
+  @Deprecated
+  public MockRecordBatch(@Nullable final FragmentContext context,
+                         @Nullable final OperatorContext oContext,
+                         @NotNull final List<VectorContainer> testContainers,
+                         @NotNull final List<IterOutcome> iterOutcomes,
+                         final BatchSchema schema) {
+    this(context,
+         oContext,
+         testContainers.stream().
+           map(container -> DirectRowSet.fromContainer(container)).
+           collect(Collectors.toList()),
+         iterOutcomes,
+         schema,
+         true);
+  }
+
+  @Deprecated
+  public MockRecordBatch(@Nullable final FragmentContext context,
+                         @Nullable final OperatorContext oContext,
+                         @NotNull final List<VectorContainer> testContainers,
+                         @NotNull final List<IterOutcome> iterOutcomes,
+                         @NotNull final List<SelectionVector2> selectionVector2s,
+                         final BatchSchema schema) {
+    this(context,
+      oContext,
+      new Supplier<List<RowSet>>() {
+        @Override
+        public List<RowSet> get() {
+          List<RowSet> rowSets = new ArrayList<>();
+
+          for (int index = 0; index < testContainers.size(); index++) {
+            if (index >= selectionVector2s.size()) {
+              rowSets.add(IndirectRowSet.fromContainer(testContainers.get(index)));
+            } else {
+              rowSets.add(IndirectRowSet.fromSv2(testContainers.get(index), selectionVector2s.get(index)));
+            }
+          }
+          return rowSets;
+        }
+      }.get(),
+      iterOutcomes,
+      schema,
+      true);
   }
 
   @Override
@@ -94,7 +145,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return null;
+    return sv4;
   }
 
   @Override
@@ -146,10 +197,11 @@ public class MockRecordBatch implements CloseableRecordBatch {
       return IterOutcome.NONE;
     }
 
-    IterOutcome currentOutcome = IterOutcome.OK;
+    IterOutcome currentOutcome;
 
-    if (currentContainerIndex < allTestContainers.size()) {
-      final VectorContainer input = allTestContainers.get(currentContainerIndex);
+    if (currentContainerIndex < rowSets.size()) {
+      final RowSet rowSet = rowSets.get(currentContainerIndex);
+      final VectorContainer input = rowSet.container();
       final int recordCount = input.getRecordCount();
       // We need to do this since the downstream operator expects vector reference to be same
       // after first next call in cases when schema is not changed
@@ -158,19 +210,34 @@ public class MockRecordBatch implements CloseableRecordBatch {
         container.clear();
         container = new VectorContainer(allocator, inputSchema);
       }
-      container.transferIn(input);
-      container.setRecordCount(recordCount);
-
-      // Transfer the sv2 as well
-      final SelectionVector2 inputSv2 =
-        (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
-          ? allTestContainersSv2.get(currentContainerIndex) : null;
-      if (inputSv2 != null) {
-        sv2.allocateNewSafe(inputSv2.getCount());
-        for (int i=0; i<inputSv2.getCount(); ++i) {
-          sv2.setIndex(i, inputSv2.getIndex(i));
-        }
-        sv2.setRecordCount(inputSv2.getCount());
+
+      switch (rowSet.indirectionType()) {
+        case NONE:
+        case TWO_BYTE:
+          container.transferIn(input);
+          container.setRecordCount(recordCount);
+          final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
+
+          if (sv2 != null) {
+            // Operators assume that new values for an Sv2 are transferred in.
+            sv2.allocateNewSafe(inputSv2.getCount());
+            for (int i=0; i<inputSv2.getCount(); ++i) {
+              sv2.setIndex(i, inputSv2.getIndex(i));
+            }
+            sv2.setRecordCount(inputSv2.getCount());
+          } else {
+            sv2 = inputSv2;
+          }
+
+          break;
+        case FOUR_BYTE:
+          // TODO find a clean way to transfer in for this case.
+          container.clear();
+          container = input;
+          sv4 = ((RowSet.HyperRowSet) rowSet).getSv4();
+          break;
+        default:
+          throw new UnsupportedOperationException();
       }
     }
 
@@ -222,4 +289,69 @@ public class MockRecordBatch implements CloseableRecordBatch {
   public void useUnnestKillHandlingForLimit(boolean limitWithUnnest) {
     this.limitWithUnnest = limitWithUnnest;
   }
+
+  public static class Builder {
+    private final List<RowSet> rowSets = new ArrayList<>();
+    private final List<IterOutcome> iterOutcomes = new ArrayList<>();
+
+    private BatchSchema batchSchema;
+    private OperatorContext oContext;
+
+    public Builder() {
+    }
+
+    private Builder sendData(final RowSet rowSet, final IterOutcome outcome) {
+      Preconditions.checkState(batchSchema == null);
+      rowSets.add(rowSet);
+      iterOutcomes.add(outcome);
+      return this;
+    }
+
+    public Builder sendData(final RowSet rowSet) {
+      final IterOutcome outcome = rowSets.isEmpty()? IterOutcome.OK_NEW_SCHEMA: IterOutcome.OK;
+      return sendData(rowSet, outcome);
+    }
+
+    public Builder sendDataWithNewSchema(final RowSet rowSet) {
+      return sendData(rowSet, IterOutcome.OK_NEW_SCHEMA);
+    }
+
+    public Builder sendDataAndEmit(final RowSet rowSet) {
+      return sendData(rowSet, IterOutcome.EMIT);
+    }
+
+    public Builder terminateWithError(IterOutcome errorOutcome) {
+      Preconditions.checkArgument(errorOutcome != IterOutcome.STOP);
+      Preconditions.checkArgument(errorOutcome != IterOutcome.OUT_OF_MEMORY);
+
+      iterOutcomes.add(errorOutcome);
+      return this;
+    }
+
+    public Builder setSchema(final BatchSchema batchSchema) {
+      Preconditions.checkState(!rowSets.isEmpty());
+      this.batchSchema = Preconditions.checkNotNull(batchSchema);
+      return this;
+    }
+
+    public Builder withOperatorContext(final OperatorContext oContext) {
+      this.oContext = Preconditions.checkNotNull(oContext);
+      return this;
+    }
+
+    public MockRecordBatch build(final FragmentContext context) {
+      BatchSchema tempSchema = batchSchema;
+
+      if (tempSchema == null && !rowSets.isEmpty()) {
+        tempSchema = rowSets.get(0).batchSchema();
+      }
+
+      return new MockRecordBatch(context,
+        oContext,
+        rowSets,
+        iterOutcomes,
+        tempSchema,
+        true);
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
index 37c0b52..9909bca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.agg;
 
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.OperatorTest;
@@ -59,7 +59,7 @@ public class TestAggWithAnyValue {
               "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.13987654321\"," +
               " \"a\": [{\"b\":70, \"c\":85}, {\"b\":90, \"c\":145}], \"m\": [{\"n\": [7, 8, 9]}], \"f\": [{\"g\": {\"h\": [{\"k\": 50}, {\"k\": 60}]}}]," +
               "\"p\": {\"q\": [33, 34, 35]}" + "}]");
-      opTestBuilder()
+      legacyOpTestBuilder()
           .physicalOperator(aggConf)
           .inputDataStreamJson(inputJsonBatches)
           .baselineColumns("age", "any_a")
@@ -146,4 +146,4 @@ public class TestAggWithAnyValue {
           .go();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java
new file mode 100644
index 0000000..2c6976c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.agg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.ExecConstants.HASHAGG_NUM_PARTITIONS_KEY;
+
+public class TestHashAggBatch extends PhysicalOpUnitTestBase {
+  public static final String FIRST_NAME_COL = "firstname";
+  public static final String LAST_NAME_COL = "lastname";
+  public static final String STUFF_COL = "stuff";
+  public static final String TOTAL_STUFF_COL = "totalstuff";
+
+  public static final List<String> FIRST_NAMES = ImmutableList.of(
+    "Strawberry",
+    "Banana",
+    "Mango",
+    "Grape");
+
+  public static final List<String> LAST_NAMES = ImmutableList.of(
+    "Red",
+    "Green",
+    "Blue",
+    "Purple");
+
+  public static final TupleMetadata INT_OUTPUT_SCHEMA = new SchemaBuilder()
+    .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+    .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+    .add(TOTAL_STUFF_COL, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL)
+    .buildSchema();
+
+  // TODO remove this in order to test multiple partitions
+  @Before
+  public void setupSimpleSingleBatchSumTestPhase1of2() {
+    operatorFixture.getOptionManager().setLocalOption(HASHAGG_NUM_PARTITIONS_KEY, 1);
+  }
+
+  @Test
+  public void simpleSingleBatchSumTestPhase1of2() throws Exception {
+    batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_1of2);
+  }
+
+  @Test
+  public void simpleMultiBatchSumTestPhase1of2() throws Exception {
+    batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_1of2);
+  }
+
+  @Test
+  public void simpleSingleBatchSumTestPhase1of1() throws Exception {
+    batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_1of1);
+  }
+
+  @Test
+  public void simpleMultiBatchSumTestPhase1of1() throws Exception {
+    batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_1of1);
+  }
+
+  @Test
+  public void simpleSingleBatchSumTestPhase2of2() throws Exception {
+    batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_2of2);
+  }
+
+  @Test
+  public void simpleMultiBatchSumTestPhase2of2() throws Exception {
+    batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_2of2);
+  }
+
+  private void batchSumTest(int totalCount, int maxInputBatchSize, AggPrelBase.OperatorPhase phase) throws Exception {
+    final HashAggregate hashAggregate = createHashAggPhysicalOperator(phase);
+    final List<RowSet> inputRowSets = buildInputRowSets(TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED,
+      totalCount, maxInputBatchSize);
+
+    final MockRecordBatch.Builder rowSetBatchBuilder = new MockRecordBatch.Builder();
+    inputRowSets.forEach(rowSet -> rowSetBatchBuilder.sendData(rowSet));
+    final MockRecordBatch inputRowSetBatch = rowSetBatchBuilder.build(fragContext);
+
+    final RowSet expectedRowSet = buildIntExpectedRowSet(totalCount);
+
+    opTestBuilder()
+      .physicalOperator(hashAggregate)
+      .combineOutputBatches()
+      .unordered()
+      .addUpstreamBatch(inputRowSetBatch)
+      .addExpectedResult(expectedRowSet)
+      .go();
+  }
+
+  private HashAggregate createHashAggPhysicalOperator(AggPrelBase.OperatorPhase phase) {
+    final List<NamedExpression> keyExpressions = Lists.newArrayList(
+      new NamedExpression(SchemaPath.getSimplePath(FIRST_NAME_COL), new FieldReference(FIRST_NAME_COL)),
+      new NamedExpression(SchemaPath.getSimplePath(LAST_NAME_COL), new FieldReference(LAST_NAME_COL)));
+
+    final List<NamedExpression> aggExpressions = Lists.newArrayList(
+      new NamedExpression(
+        new FunctionCall("sum", ImmutableList.of(SchemaPath.getSimplePath(STUFF_COL)),
+          new ExpressionPosition(null, 0)),
+        new FieldReference(TOTAL_STUFF_COL)));
+
+    return new HashAggregate(
+      null,
+      phase,
+      keyExpressions,
+      aggExpressions,
+      0.0f);
+  }
+
+  private TupleMetadata buildInputSchema(TypeProtos.MinorType minorType, TypeProtos.DataMode dataMode) {
+    return new SchemaBuilder()
+      .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add(STUFF_COL, minorType, dataMode)
+      .buildSchema();
+  }
+
+  private List<RowSet> buildInputRowSets(final TypeProtos.MinorType minorType,
+                                  final TypeProtos.DataMode dataMode,
+                                  final int dataCount,
+                                  final int maxBatchSize) {
+    Preconditions.checkArgument(dataCount > 0);
+    Preconditions.checkArgument(maxBatchSize > 0);
+
+    List<RowSet> inputRowSets = new ArrayList<>();
+    int currentBatchSize = 0;
+    RowSetBuilder inputRowSetBuilder = null;
+
+    for (int multiplier = 1, firstNameIndex = 0; firstNameIndex < FIRST_NAMES.size(); firstNameIndex++) {
+      final String firstName = FIRST_NAMES.get(firstNameIndex);
+
+      for (int lastNameIndex = 0; lastNameIndex < LAST_NAMES.size(); lastNameIndex++, multiplier++) {
+        final String lastName = LAST_NAMES.get(lastNameIndex);
+
+        for (int index = 1; index <= dataCount; index++) {
+          final int num = index * multiplier;
+
+          if (currentBatchSize == 0) {
+            final TupleMetadata inputSchema = buildInputSchema(minorType, dataMode);
+            inputRowSetBuilder = new RowSetBuilder(operatorFixture.allocator(), inputSchema);
+          }
+
+          inputRowSetBuilder.addRow(firstName, lastName, num);
+          currentBatchSize++;
+
+          if (currentBatchSize == maxBatchSize) {
+            final RowSet rowSet = inputRowSetBuilder.build();
+            inputRowSets.add(rowSet);
+            currentBatchSize = 0;
+          }
+        }
+      }
+    }
+
+    if (currentBatchSize != 0) {
+      inputRowSets.add(inputRowSetBuilder.build());
+    }
+
+    return inputRowSets;
+  }
+
+  private RowSet buildIntExpectedRowSet(final int dataCount) {
+    final RowSetBuilder expectedRowSetBuilder = new RowSetBuilder(operatorFixture.allocator(), INT_OUTPUT_SCHEMA);
+
+    for (int multiplier = 1, firstNameIndex = 0; firstNameIndex < FIRST_NAMES.size(); firstNameIndex++) {
+      final String firstName = FIRST_NAMES.get(firstNameIndex);
+
+      for (int lastNameIndex = 0; lastNameIndex < LAST_NAMES.size(); lastNameIndex++, multiplier++) {
+        final String lastName = LAST_NAMES.get(lastNameIndex);
+        final long total = ((dataCount * (dataCount + 1)) / 2) * multiplier;
+
+        expectedRowSetBuilder.addRow(firstName, lastName, total);
+      }
+    }
+
+    return expectedRowSetBuilder.build();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index 48fd856..bbe57fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
 import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl;
@@ -41,13 +42,13 @@ import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetBatch;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.Assert;
@@ -68,27 +69,31 @@ public class HashPartitionTest {
       private RowSet probeRowSet;
 
       @Override
-      public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
-        buildRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context) {
+        buildRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(1, "green")
           .addRow(3, "red")
           .addRow(2, "blue")
           .build();
-        return new RowSetBatch(buildRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(buildRowSet).
+          build(context);
       }
 
       @Override
-      public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+      public void createResultBuildBatch(BatchSchema schema, FragmentContext context) {
       }
 
       @Override
-      public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
-        probeRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context) {
+        probeRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(.5, "yellow")
           .addRow(1.5, "blue")
           .addRow(2.5, "black")
           .build();
-        return new RowSetBatch(probeRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(probeRowSet).
+          build(context);
       }
 
       @Override
@@ -114,9 +119,9 @@ public class HashPartitionTest {
 
         final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
 
-        hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 0, 10, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 1, 11, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 2, 12, noopCalc);
         hashPartition.completeAnInnerBatch(false, false);
         hashPartition.buildContainersHashTableAndHelper();
 
@@ -155,22 +160,24 @@ public class HashPartitionTest {
       private RowSet actualBuildRowSet;
 
       @Override
-      public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
-        buildRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context) {
+        buildRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(1, "green")
           .addRow(3, "red")
           .addRow(2, "blue")
           .build();
-        return new RowSetBatch(buildRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(buildRowSet).
+          build(context);
       }
 
       @Override
-      public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+      public void createResultBuildBatch(BatchSchema schema, FragmentContext context) {
         final BatchSchema newSchema = BatchSchema.newBuilder()
           .addFields(schema)
           .addField(MaterializedField.create(HashPartition.HASH_VALUE_COLUMN_NAME, HashPartition.HVtype))
           .build();
-        actualBuildRowSet = new RowSetBuilder(allocator, newSchema)
+        actualBuildRowSet = new RowSetBuilder(context.getAllocator(), newSchema)
           .addRow(1, "green", 10)
           .addRow(3, "red", 11)
           .addRow(2, "blue", 12)
@@ -178,13 +185,15 @@ public class HashPartitionTest {
       }
 
       @Override
-      public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
-        probeRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context) {
+        probeRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(.5, "yellow")
           .addRow(1.5, "blue")
           .addRow(2.5, "black")
           .build();
-        return new RowSetBatch(probeRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(probeRowSet).
+          build(context);
       }
 
       @Override
@@ -210,9 +219,9 @@ public class HashPartitionTest {
 
         final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
 
-        hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 0, 10, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 1, 11, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 2, 12, noopCalc);
         hashPartition.completeAnInnerBatch(false, false);
         hashPartition.spillThisPartition();
         final String spillFile = hashPartition.getSpillFile();
@@ -260,15 +269,17 @@ public class HashPartitionTest {
         MaterializedField buildColB = MaterializedField.create("buildColB", Types.required(TypeProtos.MinorType.VARCHAR));
         List<MaterializedField> buildCols = Lists.newArrayList(buildColA, buildColB);
         final BatchSchema buildSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, buildCols);
-        final RecordBatch buildBatch = testCase.createBuildBatch(buildSchema, allocator);
-        testCase.createResultBuildBatch(buildSchema, allocator);
+        final CloseableRecordBatch buildBatch = testCase.createBuildBatch(buildSchema, operatorContext.getFragmentContext());
+        buildBatch.next();
+        testCase.createResultBuildBatch(buildSchema, operatorContext.getFragmentContext());
 
         // Create probe batch
         MaterializedField probeColA = MaterializedField.create("probeColA", Types.required(TypeProtos.MinorType.FLOAT4));
         MaterializedField probeColB = MaterializedField.create("probeColB", Types.required(TypeProtos.MinorType.VARCHAR));
         List<MaterializedField> probeCols = Lists.newArrayList(probeColA, probeColB);
         final BatchSchema probeSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, probeCols);
-        final RecordBatch probeBatch = testCase.createProbeBatch(probeSchema, allocator);
+        final CloseableRecordBatch probeBatch = testCase.createProbeBatch(probeSchema, operatorContext.getFragmentContext());
+        probeBatch.next();
 
         final LogicalExpression buildColExpression = SchemaPath.getSimplePath(buildColB.getName());
         final LogicalExpression probeColExpression = SchemaPath.getSimplePath(probeColB.getName());
@@ -285,14 +296,17 @@ public class HashPartitionTest {
         baseHashTable.updateIncoming(buildBatch, probeBatch);
 
         testCase.run(spillSet, buildSchema, probeSchema, buildBatch, probeBatch, baseHashTable, context, operatorContext);
+
+        buildBatch.close();
+        probeBatch.close();
       }
     }
   }
 
   interface HashPartitionTestCase {
-    RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator);
-    void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator);
-    RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator);
+    CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context);
+    void createResultBuildBatch(BatchSchema schema, FragmentContext context);
+    CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context);
 
     void run(SpillSet spillSet,
              BatchSchema buildSchema,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
index aae566b..d3d1487 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
@@ -22,10 +22,10 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.work.filter.BloomFilter;
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -58,7 +58,7 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
     for ( int cnt = 1; cnt <= numRows; cnt++ ) {
       leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
     }
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
@@ -91,7 +91,7 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
     for ( int cnt = 1; cnt <= numRows; cnt++ ) {
       leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
     }
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
index 0c08611..67d214f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -23,7 +23,7 @@ import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.categories.SlowTest;
 
 import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -52,7 +52,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
       rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
     }
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
@@ -79,7 +79,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
       rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
     }
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
@@ -110,7 +110,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
       // rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
     }
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
index 7225edc..3dbd1cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.limit;
 
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
@@ -55,7 +55,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -71,7 +71,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -85,7 +85,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -99,7 +99,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -114,7 +114,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -130,7 +130,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -144,7 +144,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
index 7d444b4..292af20 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
@@ -20,77 +20,99 @@ package org.apache.drill.exec.physical.impl.svremover;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetBatch;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Rule;
 import org.junit.Test;
 
 public abstract class AbstractGenericCopierTest {
+  @Rule
+  public final BaseDirTestWatcher baseDirTestWatcher = new BaseDirTestWatcher();
+
   @Test
-  public void testCopyRecords() throws SchemaChangeException {
-    try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+  public void testCopyRecords() throws Exception {
+    try (OperatorFixture operatorFixture = new OperatorFixture.Builder(baseDirTestWatcher).build()) {
+      final BufferAllocator allocator = operatorFixture.allocator();
+      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
-      final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
-      final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
+      final VectorContainer destContainer = new VectorContainer(allocator, batchSchema);
+
+      destContainer.setRecordCount(0);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.copyRecords(0, 3);
+      MockRecordBatch mockRecordBatch = null;
 
       try {
-        new RowSetComparison(expectedRowSet).verify(destRowSet);
+        mockRecordBatch = new MockRecordBatch.Builder().
+          sendData(srcRowSet).
+          build(operatorFixture.getFragmentContext());
+        mockRecordBatch.next();
+        final Copier copier = createCopier(mockRecordBatch, destContainer, null);
+        copier.copyRecords(0, 3);
+
+        new RowSetComparison(expectedRowSet).verify(DirectRowSet.fromContainer(destContainer));
       } finally {
-        srcRowSet.clear();
-
-        if (srcRowSet instanceof RowSet.HyperRowSet) {
-          ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+        if (mockRecordBatch != null) {
+          mockRecordBatch.close();
         }
 
-        destRowSet.clear();
+        srcRowSet.clear();
+        destContainer.clear();
         expectedRowSet.clear();
       }
     }
   }
 
   @Test
-  public void testAppendRecords() throws SchemaChangeException {
-    try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+  public void testAppendRecords() throws Exception {
+    try (OperatorFixture operatorFixture = new OperatorFixture.Builder(baseDirTestWatcher).build()) {
+      final BufferAllocator allocator = operatorFixture.allocator();
+      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
-      final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
-      final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
+      final VectorContainer destContainer = new VectorContainer(allocator, batchSchema);
+
+      AbstractCopier.allocateOutgoing(destContainer, 3);
+
+      destContainer.setRecordCount(0);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.appendRecord(0);
-      copier.appendRecords(1, 2);
+      MockRecordBatch mockRecordBatch = null;
 
       try {
-        new RowSetComparison(expectedRowSet).verify(destRowSet);
+        mockRecordBatch = new MockRecordBatch.Builder().
+          sendData(srcRowSet).
+          build(operatorFixture.getFragmentContext());
+        mockRecordBatch.next();
+        final Copier copier = createCopier(mockRecordBatch, destContainer, null);
+        copier.appendRecord(0);
+        copier.appendRecords(1, 2);
+
+        new RowSetComparison(expectedRowSet).verify(DirectRowSet.fromContainer(destContainer));
       } finally {
-        srcRowSet.clear();
-
-        if (srcRowSet instanceof RowSet.HyperRowSet) {
-          ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+        if (mockRecordBatch != null) {
+          mockRecordBatch.close();
         }
 
-        destRowSet.clear();
+        srcRowSet.clear();
+        destContainer.clear();
         expectedRowSet.clear();
       }
     }
   }
 
-  public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException;
+  public abstract RowSet createSrcRowSet(BufferAllocator allocator) throws SchemaChangeException;
 
   public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer,
                                       SchemaChangeCallBack callback) {
@@ -117,7 +139,7 @@ public abstract class AbstractGenericCopierTest {
     return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}};
   }
 
-  private RowSet createExpectedRowset(RootAllocator allocator) {
+  public RowSet createExpectedRowset(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
       .addRow(row1())
       .addRow(row2())
@@ -125,7 +147,7 @@ public abstract class AbstractGenericCopierTest {
       .build();
   }
 
-  protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) {
+  protected BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) {
     MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT));
     MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR));
     MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4));
@@ -136,6 +158,6 @@ public abstract class AbstractGenericCopierTest {
       .add(colC)
       .add(colD)
       .withSVMode(mode)
-      .buildSchema();
+      .build();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
index d6c38e7..2636490 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
@@ -27,7 +27,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
 
 public class GenericCopierTest extends AbstractGenericCopierTest {
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) {
+  public RowSet createSrcRowSet(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
       .addRow(row1())
       .addRow(row2())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
index 748e0d0..2fec0e5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
@@ -29,7 +29,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
 public class GenericSV2BatchCopierTest extends AbstractGenericCopierTest {
 
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) {
+  public RowSet createSrcRowSet(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
       .addSelection(true, row1())
       .addRow(row2())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
index b2f0e51..42182e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
@@ -25,7 +25,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
 public class GenericSV2CopierTest extends AbstractGenericCopierTest {
 
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) {
+  public RowSet createSrcRowSet(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
       .addRow(row1())
       .addSelection(false, row4())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
index a5f5bb7..46edab7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
@@ -19,11 +19,10 @@ package org.apache.drill.exec.physical.impl.svremover;
 
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.RowSet;
@@ -32,8 +31,8 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
 public class GenericSV4CopierTest extends AbstractGenericCopierTest {
 
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException {
-    final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+  public RowSet createSrcRowSet(BufferAllocator allocator) throws SchemaChangeException {
+    final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
     final DrillBuf drillBuf = allocator.buffer(4 * 3);
     final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE);
 
@@ -52,10 +51,12 @@ public class GenericSV4CopierTest extends AbstractGenericCopierTest {
 
     final ExpandableHyperContainer hyperContainer = new ExpandableHyperContainer(batch1);
     hyperContainer.addBatch(batch2);
+    hyperContainer.setRecordCount(5);
 
     sv4.set(0, 0, 0);
     sv4.set(1, 1, 0);
     sv4.set(2, 1, 2);
+    sv4.setCount(3);
 
     return new HyperRowSetImpl(hyperContainer, sv4);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index 0a8cd88..363a939 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -38,6 +38,8 @@ import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -51,7 +53,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> jsonBatches = Lists.newArrayList(
         "[{\"x\": 5 },{\"x\": 10 }]",
         "[{\"x\": 20 },{\"x\": 30 },{\"x\": 40 }]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(projectConf)
         .inputDataStreamJson(jsonBatches)
         .baselineColumns("x")
@@ -69,7 +71,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> jsonBatches = Lists.newArrayList(
         "[{\"json_col\": \"{ \\\"a\\\" : 1 }\"}]",
         "[{\"json_col\": \"{ \\\"a\\\" : 5 }\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(projectConf)
         .inputDataStreamJson(jsonBatches)
         .baselineColumns("complex_col")
@@ -91,7 +93,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> rightJsonBatches = Lists.newArrayList(
         "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
         "[{\"x1\": 6, \"a2\" : \"qwerty\"},{\"x1\": 5, \"a2\" : \"12345\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(joinConf)
         .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
         .baselineColumns("x", "a", "a2", "x1")
@@ -116,7 +118,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> rightJsonBatches = Lists.newArrayList(
         "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
         "[{\"x1\": 5, \"a2\" : \"12345\"}, {\"x1\": 6, \"a2\" : \"qwerty\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(joinConf)
         .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
         .baselineColumns("x", "a", "a2", "x1")
@@ -135,7 +137,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(aggConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("b_sum", "a")
@@ -150,7 +152,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(aggConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("b_sum", "a")
@@ -165,7 +167,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": {\"b\" : 1 }}]",
         "[{\"a\": {\"b\" : 5}},{\"a\": {\"b\" : 8}}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(complexToJson)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("a")
@@ -182,7 +184,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
         "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(filterConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("a", "b")
@@ -206,7 +208,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
       inputJsonBatches.add(batchString.toString());
     }
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(flatten)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns("a", "b")
@@ -225,7 +227,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
         "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(sortConf)
         .maxAllocation(15_000_000L)
         .inputDataStreamJson(inputJsonBatches)
@@ -253,8 +255,8 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
       inputJsonBatches.add(batchString.toString());
     }
 
-    OperatorTestBuilder opTestBuilder =
-        opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder =
+        legacyOpTestBuilder()
             .initReservation(initReservation)
             .maxAllocation(maxAllocation)
             .physicalOperator(sortConf)
@@ -312,7 +314,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
         "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(sortConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("a", "b")
@@ -336,7 +338,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
     List<String> rightJsonBatches = Lists.newArrayList(
         "[{\"x\": 5, \"a\" : \"asdf\"}]",
         "[{\"x\": 5, \"a\" : \"12345\"}, {\"x\": 6, \"a\" : \"qwerty\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(mergeConf)
         .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
         .baselineColumns("x", "a")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index 79f260f..2f05bba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -171,7 +172,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
   }
 
   /**
-   * Similar to {@link OperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
+   * Similar to {@link LegacyOperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
    * The input record batch could be a non-scan operator by calling {@link PopBuilder#addInputAsChild},
    * or a scan operator by calling {@link PopBuilder#addJsonScanAsChild()} if it's SCAN operator.
    *
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 84a4fbc..f632455 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -49,6 +49,8 @@ import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -121,7 +123,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -197,7 +199,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
       fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", memoryLimit);
 
-      OperatorTestBuilder opTestBuilder = opTestBuilder()
+      LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
               .physicalOperator(projectConf)
               .inputDataStreamJson(inputJsonBatches)
               .baselineColumns(baselineColumns)
@@ -276,7 +278,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -347,7 +349,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -418,7 +420,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -481,7 +483,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -549,7 +551,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -611,7 +613,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -672,7 +674,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -760,7 +762,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -869,7 +871,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -967,7 +969,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -1065,7 +1067,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     // Here we expect 16 batches because each batch will be limited by upper limit of 65535 records.
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "c")
@@ -1125,7 +1127,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // Here we expect 10 batches because each batch will be bounded by lower limit of at least 1 record.
     // do not check the output batch size as it will be more than configured value of 1024, so we get
     // at least one record out.
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "c")
@@ -1172,7 +1174,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     batchString.append("]");
     inputJsonBatches.add(batchString.toString());
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -1233,7 +1235,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -1302,7 +1304,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -1372,7 +1374,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We should get 1 batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(1)  // verify number of batches
@@ -1426,7 +1428,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
 
     // expect two batches, batch limited by 65535 records
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "c1", "a2", "c2")
       .expectedNumBatches(2)  // verify number of batches
@@ -1481,7 +1483,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // set very low value of output batch size so we can do only one row per batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(10)  // verify number of batches
@@ -1549,7 +1551,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We should get 2 batches, one for the left and one for the right.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(unionAll)
       .baselineColumns("a1", "b1", "c1")
       .expectedNumBatches(2)  // verify number of batches
@@ -1620,7 +1622,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We should get 4 batches, 2 for the left and 2 for the right.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(unionAll)
       .baselineColumns("a1", "b1", "c1")
       .expectedNumBatches(4)  // verify number of batches
@@ -1692,7 +1694,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We should get 22 batches for 22 rows.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(unionAll)
       .baselineColumns("a1","b1", "c1")
       .expectedNumBatches(22)  // verify number of batches
@@ -1764,7 +1766,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -1834,7 +1836,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We should get 1 batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(1)  // verify number of batches
@@ -1888,7 +1890,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
 
     // expect two batches, batch limited by 65535 records
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "c1", "a2", "c2")
       .expectedNumBatches(2)  // verify number of batches
@@ -1943,7 +1945,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // set very low value of output batch size so we can do only one row per batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(10)  // verify number of batches
@@ -2011,7 +2013,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -2080,7 +2082,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -2102,7 +2104,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
        "[{\"a\": 5, \"b\" : 1 }]",
          "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(aggConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("b_sum", "a")
@@ -2157,7 +2159,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashAgg)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b_sum")
@@ -2218,7 +2220,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashAgg)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b_avg")
@@ -2278,7 +2280,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashAgg)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b_max")
@@ -2350,7 +2352,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately 4 batches.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(4)  // verify number of batches
@@ -2424,7 +2426,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We should get 1 batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(1)  // verify number of batches
@@ -2482,7 +2484,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250
     // expect two batches, batch limited by 65535 records
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "c1", "a2", "c2")
             .expectedNumBatches(2)  // verify number of batches
@@ -2542,7 +2544,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // set very low value of output batch size so we can do only one row per batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(10)  // verify number of batches
@@ -2612,7 +2614,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     // We will get approximately 4 batches.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(4)  // verify number of batches
@@ -2949,4 +2951,4 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
index 9f4e026..4e0a739 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import java.math.BigDecimal;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.ColumnMemoryUsageInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java
new file mode 100644
index 0000000..dacb061
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * @deprecated Use {@link OperatorTestBuilder} instead.
+ */
+@Deprecated
+public class LegacyOperatorTestBuilder {
+
+  private PhysicalOpUnitTestBase physicalOpUnitTestBase;
+
+  private PhysicalOperator popConfig;
+  private String[] baselineColumns;
+  private List<Map<String, Object>> baselineRecords;
+  private List<List<String>> inputStreamsJSON;
+  private long initReservation = AbstractBase.INIT_ALLOCATION;
+  private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+  private boolean expectNoRows;
+  private Long expectedBatchSize;
+  private Integer expectedNumBatches;
+  private Integer expectedTotalRows;
+
+  public LegacyOperatorTestBuilder(PhysicalOpUnitTestBase physicalOpUnitTestBase) {
+    this.physicalOpUnitTestBase = physicalOpUnitTestBase;
+  }
+
+  @SuppressWarnings({"unchecked", "resource"})
+  public void go() {
+    BatchCreator<PhysicalOperator> opCreator;
+    RecordBatch testOperator;
+    try {
+      physicalOpUnitTestBase.mockOpContext(popConfig, initReservation, maxAllocation);
+
+      opCreator = (BatchCreator<PhysicalOperator>) physicalOpUnitTestBase.opCreatorReg.getOperatorCreator(popConfig.getClass());
+      List<RecordBatch> incomingStreams = Lists.newArrayList();
+      if (inputStreamsJSON != null) {
+        for (List<String> batchesJson : inputStreamsJSON) {
+          incomingStreams.add(new ScanBatch(popConfig, physicalOpUnitTestBase.fragContext,
+              physicalOpUnitTestBase.getReaderListForJsonBatches(batchesJson, physicalOpUnitTestBase.fragContext)));
+        }
+      }
+
+      testOperator = opCreator.getBatch(physicalOpUnitTestBase.fragContext, popConfig, incomingStreams);
+
+      Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new PhysicalOpUnitTestBase.BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows);
+      if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results
+
+      Map<String, List<Object>> expectedSuperVectors;
+
+      if (expectNoRows) {
+        expectedSuperVectors = new TreeMap<>();
+        for (String column : baselineColumns) {
+          expectedSuperVectors.put(column, new ArrayList<>());
+        }
+      } else {
+        expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+      }
+
+      DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+
+    } catch (ExecutionSetupException e) {
+      throw new RuntimeException(e);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public LegacyOperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+    this.popConfig = batch;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder initReservation(long initReservation) {
+    this.initReservation = initReservation;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder maxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
+    this.inputStreamsJSON = new ArrayList<>();
+    this.inputStreamsJSON.add(jsonBatches);
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
+    this.inputStreamsJSON = childStreams;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder baselineColumns(String... columns) {
+    for (int i = 0; i < columns.length; i++) {
+      LogicalExpression ex = physicalOpUnitTestBase.parseExpr(columns[i]);
+      if (ex instanceof SchemaPath) {
+        columns[i] = ((SchemaPath)ex).toExpr();
+      } else {
+        throw new IllegalStateException("Schema path is not a valid format.");
+      }
+    }
+    this.baselineColumns = columns;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder baselineValues(Object... baselineValues) {
+    if (baselineRecords == null) {
+      baselineRecords = new ArrayList<>();
+    }
+    Map<String, Object> ret = new HashMap<>();
+    int i = 0;
+    Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
+        "Must supply the same number of baseline values as columns.");
+    for (String s : baselineColumns) {
+      ret.put(s, baselineValues[i]);
+      i++;
+    }
+    this.baselineRecords.add(ret);
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectZeroRows() {
+    this.expectNoRows = true;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
+    this.expectedNumBatches = expectedNumBatches;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectedBatchSize(Long batchSize) {
+    this.expectedBatchSize = batchSize;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
+    this.expectedTotalRows = expectedTotalRows;
+    return this;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java
new file mode 100644
index 0000000..746422c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.svremover.Copier;
+import org.apache.drill.exec.physical.impl.svremover.GenericCopier;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class OperatorTestBuilder {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorTestBuilder.class);
+
+  private final List<RowSet> expectedResults = new ArrayList<>();
+  private final List<MockRecordBatch> upstreamBatches = new ArrayList<>();
+  private PhysicalOpUnitTestBase physicalOpUnitTestBase;
+
+  private PhysicalOperator physicalOperator;
+  private long initReservation = AbstractBase.INIT_ALLOCATION;
+  private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+  private Optional<Integer> expectedNumBatchesOpt = Optional.empty();
+  private Optional<Integer> expectedTotalRowsOpt = Optional.empty();
+  private boolean combineOutputBatches;
+  private boolean unordered;
+
+  public OperatorTestBuilder(PhysicalOpUnitTestBase physicalOpUnitTestBase) {
+    this.physicalOpUnitTestBase = physicalOpUnitTestBase;
+  }
+
+  @SuppressWarnings({"unchecked", "resource"})
+  public void go() throws Exception {
+    final List<RowSet> actualResults = new ArrayList<>();
+    CloseableRecordBatch testOperator = null;
+
+    try {
+      validate();
+      int expectedNumBatches = expectedNumBatchesOpt.orElse(expectedResults.size());
+      physicalOpUnitTestBase.mockOpContext(physicalOperator, initReservation, maxAllocation);
+
+      final BatchCreator<PhysicalOperator> opCreator = (BatchCreator<PhysicalOperator>) physicalOpUnitTestBase.opCreatorReg.getOperatorCreator(physicalOperator.getClass());
+      testOperator = opCreator.getBatch(physicalOpUnitTestBase.fragContext, physicalOperator, (List)upstreamBatches);
+
+      batchIterator: for (int batchIndex = 0;; batchIndex++) {
+        final RecordBatch.IterOutcome outcome = testOperator.next();
+
+        switch (outcome) {
+          case NONE:
+            if (!combineOutputBatches) {
+              Assert.assertEquals(expectedNumBatches, batchIndex);
+            }
+            // We are done iterating over batches. Now we need to compare them.
+            break batchIterator;
+          case OK_NEW_SCHEMA:
+            boolean skip = true;
+
+            try {
+              skip = testOperator.getContainer().getRecordCount() == 0;
+            } catch (IllegalStateException e) {
+              // We should skip this batch in this case. It means no data was included with the okay schema
+            } finally {
+              if (skip) {
+                batchIndex--;
+                break;
+              }
+            }
+          case OK:
+            if (!combineOutputBatches && batchIndex >= expectedNumBatches) {
+              testOperator.getContainer().clear();
+              Assert.fail("More batches received than expected.");
+            } else {
+              final boolean hasSelectionVector = testOperator.getSchema().getSelectionVectorMode().hasSelectionVector;
+              final VectorContainer container = testOperator.getContainer();
+
+              if (hasSelectionVector) {
+                throw new UnsupportedOperationException("Implement DRILL-6698");
+              } else {
+                actualResults.add(DirectRowSet.fromContainer(container));
+              }
+              break;
+            }
+          default:
+            throw new UnsupportedOperationException("Can't handle this yet");
+        }
+      }
+
+      int actualTotalRows = actualResults.stream()
+        .mapToInt(RowSet::rowCount)
+        .reduce(Integer::sum)
+        .orElse(0);
+
+      if (expectedResults.isEmpty()) {
+        Assert.assertEquals((int) expectedTotalRowsOpt.orElse(0), actualTotalRows);
+        // We are done, we don't have any expected result to compare
+        return;
+      }
+
+      if (combineOutputBatches) {
+        final RowSet expectedBatch = expectedResults.get(0);
+        final RowSet actualBatch = DirectRowSet.fromSchema(
+          physicalOpUnitTestBase.operatorFixture.allocator, actualResults.get(0).container().getSchema());
+        final VectorContainer actualBatchContainer = actualBatch.container();
+        actualBatchContainer.setRecordCount(0);
+
+        final int numColumns = expectedBatch.schema().size();
+        List<MutableInt> totalBytesPerColumn = new ArrayList<>();
+
+        for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+          totalBytesPerColumn.add(new MutableInt());
+        }
+
+        // Get column sizes for each result batch
+
+        final List<List<RecordBatchSizer.ColumnSize>> columnSizesPerBatch = actualResults.stream().map(rowSet -> {
+          switch (rowSet.indirectionType()) {
+            case NONE:
+              return new RecordBatchSizer(rowSet.container()).columnsList();
+            default:
+              throw new UnsupportedOperationException("Implement DRILL-6698");
+          }
+        }).collect(Collectors.toList());
+
+        // Get total bytes per column
+
+        for (List<RecordBatchSizer.ColumnSize> columnSizes: columnSizesPerBatch) {
+          for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+            final MutableInt totalBytes = totalBytesPerColumn.get(columnIndex);
+            final RecordBatchSizer.ColumnSize columnSize = columnSizes.get(columnIndex);
+            totalBytes.add(columnSize.getTotalDataSize());
+          }
+        }
+
+        for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+          final ValueVector valueVector = actualBatchContainer
+            .getValueVector(columnIndex)
+            .getValueVector();
+
+          if (valueVector instanceof FixedWidthVector) {
+            ((FixedWidthVector) valueVector).allocateNew(actualTotalRows);
+          } else if (valueVector instanceof VariableWidthVector) {
+            final MutableInt totalBytes = totalBytesPerColumn.get(columnIndex);
+            ((VariableWidthVector) valueVector).allocateNew(totalBytes.getValue(), actualTotalRows);
+          } else {
+            throw new UnsupportedOperationException();
+          }
+        }
+
+        try {
+          int currentIndex = 0;
+
+          for (RowSet actualRowSet: actualResults) {
+            final Copier copier;
+            final VectorContainer rowSetContainer = actualRowSet.container();
+            rowSetContainer.setRecordCount(actualRowSet.rowCount());
+
+            switch (actualRowSet.indirectionType()) {
+              case NONE:
+                copier = new GenericCopier();
+                break;
+              default:
+                throw new UnsupportedOperationException("Implement DRILL-6698");
+            }
+
+            copier.setup(rowSetContainer, actualBatchContainer);
+            copier.appendRecords(currentIndex, actualRowSet.rowCount());
+            currentIndex += actualRowSet.rowCount();
+
+            verify(expectedBatch, actualBatch);
+          }
+        } finally {
+          actualBatch.clear();
+        }
+      } else {
+        // Compare expected and actual results
+        for (int batchIndex = 0; batchIndex < expectedNumBatches; batchIndex++) {
+          final RowSet expectedBatch = expectedResults.get(batchIndex);
+          final RowSet actualBatch = actualResults.get(batchIndex);
+
+          verify(expectedBatch, actualBatch);
+        }
+      }
+    } finally {
+      // free resources
+
+      if (testOperator != null) {
+        testOperator.close();
+      }
+
+      actualResults.forEach(rowSet -> rowSet.clear());
+
+      if (expectedResults != null) {
+        expectedResults.forEach(rowSet -> rowSet.clear());
+      }
+
+      upstreamBatches.forEach(rowSetBatch -> {
+        try {
+          rowSetBatch.close();
+        } catch (Exception e) {
+          logger.error("Error while closing RowSetBatch", e);
+        }
+      });
+    }
+  }
+
+  private void verify(final RowSet expectedBatch, final RowSet actualBatch) {
+    if (unordered) {
+      new RowSetComparison(expectedBatch).unorderedVerify(actualBatch);
+    } else {
+      new RowSetComparison(expectedBatch).verify(actualBatch);
+    }
+  }
+
+  /**
+   * Make sure the inputs are valid.
+   */
+  private void validate() {
+    if (combineOutputBatches) {
+      Preconditions.checkArgument(expectedResults.isEmpty() || expectedResults.size() == 1,
+        "The number of expected result batches needs to be zero or one when combining output batches");
+      Preconditions.checkArgument((expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && expectedTotalRowsOpt.isPresent())) ||
+          (!expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && !expectedTotalRowsOpt.isPresent())),
+        "When definig expectedResults, you cannot define expectedNumBatch or expectedTotalRows and vice versa");
+    } else {
+      Preconditions.checkArgument((expectedResults.isEmpty() && (expectedNumBatchesOpt.isPresent() || expectedTotalRowsOpt.isPresent())) ||
+          (!expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && !expectedTotalRowsOpt.isPresent())),
+        "When definig expectedResults, you cannot define expectedNumBatch or expectedTotalRows and vice versa");
+    }
+  }
+
+  public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+    this.physicalOperator = batch;
+    return this;
+  }
+
+  public OperatorTestBuilder initReservation(long initReservation) {
+    this.initReservation = initReservation;
+    return this;
+  }
+
+  public OperatorTestBuilder maxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+    return this;
+  }
+
+  public OperatorTestBuilder expectedNumBatches(int expectedNumBatches) {
+    this.expectedNumBatchesOpt = Optional.of(expectedNumBatches);
+    return this;
+  }
+
+  public OperatorTestBuilder expectedTotalRows(int expectedTotalRows) {
+    this.expectedTotalRowsOpt = Optional.of(expectedTotalRows);
+    return this;
+  }
+
+  /**
+   * Combines all the batches output by the operator into a single batch for comparison.
+   * @return This {@link OperatorTestBuilder}.
+   */
+  public OperatorTestBuilder combineOutputBatches() {
+    combineOutputBatches = true;
+    return this;
+  }
+
+  public OperatorTestBuilder unordered() {
+    unordered = true;
+    return this;
+  }
+
+  public OperatorTestBuilder addUpstreamBatch(final MockRecordBatch mockRecordBatch) {
+    Preconditions.checkNotNull(mockRecordBatch);
+    upstreamBatches.add(mockRecordBatch);
+    return this;
+  }
+
+  public OperatorTestBuilder addExpectedResult(final RowSet rowSet) {
+    Preconditions.checkNotNull(rowSet);
+    expectedResults.add(rowSet);
+    return this;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java
new file mode 100644
index 0000000..be3cdc2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.ComparisonFailure;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class OperatorTestBuilderTest extends PhysicalOpUnitTestBase {
+  public static final String FIRST_NAME_COL = "firstname";
+  public static final String LAST_NAME_COL = "lastname";
+
+  @Test
+  public void noCombineUnorderedTestPass() throws Exception {
+    executeTest(buildInputData(), true, false);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void noCombineUnorderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), true, false);
+  }
+
+  @Test
+  public void noCombineOrderedTestPass() throws Exception {
+    executeTest(buildInputData(), false, false);
+  }
+
+  @Test(expected = ComparisonFailure.class)
+  public void noCombineOrderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), false, false);
+  }
+
+  @Test
+  public void combineUnorderedTestPass() throws Exception {
+    executeTest(buildInputData(), true, true);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void combineUnorderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), true, true);
+  }
+
+  @Test
+  public void combineOrderedTestPass() throws Exception {
+    executeTest(buildInputData(), false, true);
+  }
+
+  @Test(expected = ComparisonFailure.class)
+  public void combineOrderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), false, true);
+  }
+
+  private Project createProjectPhysicalOperator() {
+    final List<NamedExpression> exprs = Lists.newArrayList(
+      new NamedExpression(SchemaPath.getSimplePath(FIRST_NAME_COL), new FieldReference(FIRST_NAME_COL)),
+      new NamedExpression(SchemaPath.getSimplePath(LAST_NAME_COL), new FieldReference(LAST_NAME_COL)));
+
+    return new Project(exprs, new MockPhysicalOperator(), true);
+  }
+
+  private static TupleMetadata buildSchema() {
+    return new SchemaBuilder()
+      .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+  }
+
+  private RowSet buildInputData() {
+    return new RowSetBuilder(operatorFixture.allocator(), buildSchema()).
+      addRow("billy", "bob").
+      addRow("bobby", "fillet").
+      build();
+  }
+
+  private RowSet buildIncorrectData() {
+    return new RowSetBuilder(operatorFixture.allocator(), buildSchema()).
+      addRow("billy", "bob").
+      addRow("bambam", "fofof").
+      build();
+  }
+
+  private void executeTest(final RowSet expectedRowSet, boolean unordered, boolean combine) throws Exception {
+    final MockRecordBatch inputRowSetBatch = new MockRecordBatch.Builder().
+      sendData(buildInputData()).
+      build(fragContext);
+
+    final OperatorTestBuilder testBuilder = opTestBuilder()
+      .physicalOperator(createProjectPhysicalOperator());
+
+    if (combine) {
+      testBuilder.combineOutputBatches();
+    }
+
+    if (unordered) {
+      testBuilder.unordered();
+    }
+
+    testBuilder
+      .addUpstreamBatch(inputRowSetBatch)
+      .addExpectedResult(expectedRowSet)
+      .go();
+  }
+
+  public static class MockPhysicalOperator extends AbstractBase {
+    @Override
+    public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+      return null;
+    }
+
+    @Override
+    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+      return null;
+    }
+
+    @Override
+    public int getOperatorType() {
+      return 0;
+    }
+
+    @Override
+    public Iterator<PhysicalOperator> iterator() {
+      return null;
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
similarity index 71%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
rename to exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index b2412e5..1c4779c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -15,12 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.unit;
+package org.apache.drill.test;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -40,9 +38,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-import org.apache.drill.test.DrillTestWrapper;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -55,27 +51,22 @@ import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
-import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.test.OperatorFixture;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -83,7 +74,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.TreeMap;
 
 public class PhysicalOpUnitTestBase extends ExecTest {
   protected MockExecutorFragmentContext fragContext;
@@ -96,9 +86,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
   @Rule
   public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
 
-  private final DrillConfig drillConf = DrillConfig.create();
-  private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
-  private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
+  protected final DrillConfig drillConf = DrillConfig.create();
+  protected final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
+  protected final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
 
   @Before
   public void setup() throws Exception {
@@ -186,141 +176,22 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     }
   }
 
-  protected OperatorTestBuilder opTestBuilder() {
-    return new OperatorTestBuilder();
+  /**
+   * Gets a {@link LegacyOperatorTestBuilder}.
+   * @deprecated Use {@link #opTestBuilder()} instead.
+   * @return A {@link LegacyOperatorTestBuilder}.
+   */
+  @Deprecated
+  protected LegacyOperatorTestBuilder legacyOpTestBuilder() {
+    return new LegacyOperatorTestBuilder(this);
   }
 
-  protected class OperatorTestBuilder {
-
-    private PhysicalOperator popConfig;
-    private String[] baselineColumns;
-    private List<Map<String, Object>> baselineRecords;
-    private List<List<String>> inputStreamsJSON;
-    private long initReservation = AbstractBase.INIT_ALLOCATION;
-    private long maxAllocation = AbstractBase.MAX_ALLOCATION;
-    private boolean checkBatchMemory;
-    private boolean expectNoRows;
-    private Long expectedBatchSize;
-    private Integer expectedNumBatches;
-    private Integer expectedTotalRows;
-
-    @SuppressWarnings({"unchecked", "resource"})
-    public void go() {
-      BatchCreator<PhysicalOperator> opCreator;
-      RecordBatch testOperator;
-      try {
-        mockOpContext(popConfig, initReservation, maxAllocation);
-
-        opCreator = (BatchCreator<PhysicalOperator>) opCreatorReg.getOperatorCreator(popConfig.getClass());
-        List<RecordBatch> incomingStreams = Lists.newArrayList();
-        if (inputStreamsJSON != null) {
-          for (List<String> batchesJson : inputStreamsJSON) {
-            incomingStreams.add(new ScanBatch(popConfig, fragContext,
-                getReaderListForJsonBatches(batchesJson, fragContext)));
-          }
-        }
-
-        testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
-
-        Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows);
-        if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results
-
-        Map<String, List<Object>> expectedSuperVectors;
-
-        if (expectNoRows) {
-          expectedSuperVectors = new TreeMap<>();
-          for (String column : baselineColumns) {
-            expectedSuperVectors.put(column, new ArrayList<>());
-          }
-        } else {
-          expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
-        }
-
-        DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
-
-      } catch (ExecutionSetupException e) {
-        throw new RuntimeException(e);
-      } catch (UnsupportedEncodingException e) {
-        throw new RuntimeException(e);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
-      this.popConfig = batch;
-      return this;
-    }
-
-    public OperatorTestBuilder initReservation(long initReservation) {
-      this.initReservation = initReservation;
-      return this;
-    }
-
-    public OperatorTestBuilder maxAllocation(long maxAllocation) {
-      this.maxAllocation = maxAllocation;
-      return this;
-    }
-
-    public OperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
-      this.inputStreamsJSON = new ArrayList<>();
-      this.inputStreamsJSON.add(jsonBatches);
-      return this;
-    }
-
-    public OperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
-      this.inputStreamsJSON = childStreams;
-      return this;
-    }
-
-    public OperatorTestBuilder baselineColumns(String... columns) {
-      for (int i = 0; i < columns.length; i++) {
-        LogicalExpression ex = parseExpr(columns[i]);
-        if (ex instanceof SchemaPath) {
-          columns[i] = ((SchemaPath)ex).toExpr();
-        } else {
-          throw new IllegalStateException("Schema path is not a valid format.");
-        }
-      }
-      this.baselineColumns = columns;
-      return this;
-    }
-
-    public OperatorTestBuilder baselineValues(Object... baselineValues) {
-      if (baselineRecords == null) {
-        baselineRecords = new ArrayList<>();
-      }
-      Map<String, Object> ret = new HashMap<>();
-      int i = 0;
-      Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
-          "Must supply the same number of baseline values as columns.");
-      for (String s : baselineColumns) {
-        ret.put(s, baselineValues[i]);
-        i++;
-      }
-      this.baselineRecords.add(ret);
-      return this;
-    }
-
-    public OperatorTestBuilder expectZeroRows() {
-      this.expectNoRows = true;
-      return this;
-    }
-
-    public OperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
-      this.expectedNumBatches = expectedNumBatches;
-      return this;
-    }
-
-    public OperatorTestBuilder expectedBatchSize(Long batchSize) {
-      this.expectedBatchSize = batchSize;
-      return this;
-    }
-
-    public OperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
-      this.expectedTotalRows = expectedTotalRows;
-      return this;
-    }
+  /**
+   * Gets an {@link OperatorTestBuilder}.
+   * @return An {@link OperatorTestBuilder}.
+   */
+  protected OperatorTestBuilder opTestBuilder() {
+    return new OperatorTestBuilder(this);
   }
 
   /**
@@ -446,11 +317,11 @@ public class PhysicalOpUnitTestBase extends ExecTest {
   /**
    * <h2>Note</h2>
    * <p>
-   *   The {@link MockPhysicalOperator} should only be used in {@link PhysicalOpUnitTestBase} because {@link PhysicalOpUnitTestBase}
+   *   The {@link MockPhysicalOperator} should only be used in {@link PhysicalOpUnitTestBase} and its ancestors because {@link PhysicalOpUnitTestBase}
    *   needs a dummy {@link MockPhysicalOperator} to be passed to Scanners.
    * </p>
    */
-  protected static class MockPhysicalOperator extends AbstractBase
+  public static class MockPhysicalOperator extends AbstractBase
   {
     @Override
     public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
deleted file mode 100644
index a3cd918..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.test.rowSet;
-
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-
-import java.util.Iterator;
-
-public class RowSetBatch implements RecordBatch {
-  private final RowSet rowSet;
-
-  public RowSetBatch(final RowSet rowSet) {
-    this.rowSet = Preconditions.checkNotNull(rowSet);
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    return rowSet.batchSchema();
-  }
-
-  @Override
-  public int getRecordCount() {
-    return rowSet.container().getRecordCount();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    if (rowSet instanceof IndirectRowSet) {
-      return ((IndirectRowSet)rowSet).getSv2();
-    }
-
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    if (rowSet instanceof RowSet.HyperRowSet) {
-      return ((RowSet.HyperRowSet)rowSet).getSv4();
-    }
-
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void kill(boolean sendUpstream) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public VectorContainer getOutgoingContainer() {
-    return rowSet.container();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return rowSet.container().getValueVectorId(path);
-  }
-
-  @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
-    return rowSet.container().getValueAccessorById(clazz, ids);
-  }
-
-  @Override
-  public IterOutcome next() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public WritableBatch getWritableBatch() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Iterator<VectorWrapper<?>> iterator() {
-    return rowSet.container().iterator();
-  }
-
-  @Override
-  public VectorContainer getContainer() { return rowSet.container(); }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
index e098e33..b1cb058 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -20,13 +20,22 @@ package org.apache.drill.test.rowSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 import org.bouncycastle.util.Arrays;
+import org.junit.Assert;
 
 /**
  * For testing, compare the contents of two row sets (record batches)
@@ -58,10 +67,10 @@ public class RowSetComparison {
    */
   private boolean mask[];
   /**
-   * Floats and doubles do not compare exactly. This delta is used
-   * by JUnit for such comparisons.
+   * Floats and doubles do not compare exactly. This MathContext is used
+   * to construct BigDecimals of the desired precision.
    */
-  private double delta = 0.001;
+  private MathContext scale = new MathContext(3);
   /**
    * Tests can skip the first n rows.
    */
@@ -106,14 +115,15 @@ public class RowSetComparison {
   }
 
   /**
-   * Specify the delta value to use when comparing float or
+   * Specify the precision to use when comparing float or
    * double values.
    *
-   * @param delta the delta to use in float and double comparisons
+   * @param scale the precision to use for comparing floats and doubles. See {@link BigDecimal#scale()} for
+   *              a definition scale.
    * @return this builder
    */
-  public RowSetComparison withDelta(double delta) {
-    this.delta = delta;
+  public RowSetComparison withScale(int scale) {
+    this.scale = new MathContext(scale);
     return this;
   }
 
@@ -142,28 +152,93 @@ public class RowSetComparison {
     return this;
   }
 
+  private void compareSchemasAndCounts(RowSet actual) {
+    assertTrue("Schemas don't match.\n" +
+      "Expected: " + expected.schema().toString() +
+      "\nActual: " + actual.schema(), expected.schema().isEquivalent(actual.schema()));
+    int testLength = getTestLength();
+    int dataLength = offset + testLength;
+    assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
+    assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+  }
+
+  private int getTestLength() {
+    return span > -1 ? span : expected.rowCount() - offset;
+  }
+
+  public void unorderedVerify(RowSet actual) {
+    compareSchemasAndCounts(actual);
+
+    int testLength = getTestLength();
+    RowSetReader er = expected.reader();
+    RowSetReader ar = actual.reader();
+
+    for (int i = 0; i < offset; i++) {
+      er.next();
+      ar.next();
+    }
+
+    final Multiset<List<Object>> expectedSet = HashMultiset.create();
+    final Multiset<List<Object>> actualSet = HashMultiset.create();
+
+    for (int rowCounter = 0; rowCounter < testLength; rowCounter++) {
+      er.next();
+      ar.next();
+
+      expectedSet.add(buildRow(er));
+      actualSet.add(buildRow(ar));
+    }
+
+    Assert.assertEquals(expectedSet, actualSet);
+  }
+
+  /**
+   * Convenience method to verify the actual results, then free memory
+   * for both the expected and actual result sets.
+   * @param actual the actual results to verify
+   */
+  public void unorderedVerifyAndClearAll(RowSet actual) {
+    try {
+      unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  private List<Object> buildRow(RowSetReader reader) {
+    final List<Object> row = new ArrayList<>();
+
+    for (int i = 0; i < mask.length; i++) {
+      if (!mask[i]) {
+        continue;
+      }
+
+      final ScalarReader scalarReader = reader.column(i).scalar();
+      final Object value = getScalar(scalarReader);
+      row.add(value);
+    }
+
+    return row;
+  }
+
   /**
    * Verify the actual rows using the rules defined in this builder
    * @param actual the actual results to verify
    */
 
   public void verify(RowSet actual) {
-    assertTrue("Schemas don't match.\n" +
-        "Expected: " + expected.schema().toString() +
-        "\nActual: " + actual.schema(), expected.schema().isEquivalent(actual.schema()));
-    int testLength = expected.rowCount() - offset;
-    if (span > -1) {
-      testLength = span;
-    }
-    int dataLength = offset + testLength;
-    assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
-    assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+    compareSchemasAndCounts(actual);
+    int testLength = getTestLength();
+
     RowSetReader er = expected.reader();
     RowSetReader ar = actual.reader();
+
     for (int i = 0; i < offset; i++) {
       er.next();
       ar.next();
     }
+
     for (int i = 0; i < testLength; i++) {
       er.next();
       ar.next();
@@ -245,34 +320,32 @@ public class RowSetComparison {
     if (! ec.isNull()) {
       assertTrue(label + " - column is null", ! ac.isNull());
     }
+
     switch (ec.valueType()) {
-    case BYTES: {
+      case BYTES:
         byte expected[] = ac.getBytes();
         byte actual[] = ac.getBytes();
         assertEquals(label + " - byte lengths differ", expected.length, actual.length);
         assertTrue(label, Arrays.areEqual(expected, actual));
         break;
-     }
-     case DOUBLE:
-       assertEquals(label, ec.getDouble(), ac.getDouble(), delta);
-       break;
-     case INTEGER:
-       assertEquals(label, ec.getInt(), ac.getInt());
-       break;
-     case LONG:
-       assertEquals(label, ec.getLong(), ac.getLong());
-       break;
-     case STRING:
-       assertEquals(label, ec.getString(), ac.getString());
-        break;
-     case DECIMAL:
-       assertEquals(label, ec.getDecimal(), ac.getDecimal());
-       break;
-     case PERIOD:
-       assertEquals(label, ec.getPeriod(), ac.getPeriod());
-       break;
-     default:
-        throw new IllegalStateException( "Unexpected type: " + ec.valueType());
+      default:
+        assertEquals(label, getScalar(ec), getScalar(ac));
+    }
+  }
+
+  private Object getScalar(final ScalarReader scalarReader) {
+    if (scalarReader.isNull()) {
+      return Optional.absent();
+    }
+
+    switch (scalarReader.valueType()) {
+      case BYTES:
+        return ByteBuffer.wrap(scalarReader.getBytes());
+      case DOUBLE: {
+        return new BigDecimal(scalarReader.getDouble(), this.scale).stripTrailingZeros();
+      }
+      default:
+        return scalarReader.getObject();
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java
new file mode 100644
index 0000000..b8bdb12
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRowSetComparison {
+  private BufferAllocator allocator;
+
+  @Before
+  public void setup() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @Test
+  public void simpleUnorderedComparisonMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(1, 1)
+      .addRow(1, 2)
+      .addRow(2, 1)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(1, 2)
+      .addRow(2, 1)
+      .addRow(1, 1)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test
+  public void simpleDoubleUnorderedComparisonMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.01)
+      .addRow(1.01f, 1.0)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1.004f, .9996)
+      .addRow(1.0f, 1.008)
+      .addRow(1.008f, 1.0)
+      .addRow(.9996f, 1.004)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test
+  public void simpleVarcharUnorderedComparisonMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow("aaa")
+      .addRow("bbb")
+      .addRow("ccc")
+      .addRow("bbb")
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow("ccc")
+      .addRow("aaa")
+      .addRow("bbb")
+      .addRow("bbb")
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test(expected = AssertionError.class)
+  public void simpleUnorderedComparisonNoMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(3, 2)
+      .addRow(2, 4)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(2, 1)
+      .addRow(1, 1)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test(expected = AssertionError.class)
+  public void simpleDoubleUnorderedComparisonNoMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.01)
+      .addRow(1.01f, 1.0)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1.009f, .9996)
+      .addRow(1.0f, 1.004)
+      .addRow(1.008f, 1.0)
+      .addRow(.9994f, 1.004)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test(expected = AssertionError.class)
+  public void simpleVarcharUnorderedComparisonNoMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow("red")
+      .addRow("bbb")
+      .addRow("ccc")
+      .addRow("bbb")
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow("ccc")
+      .addRow("aaa")
+      .addRow("blue")
+      .addRow("bbb")
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @After
+  public void teardown() {
+    allocator.close();
+  }
+}
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index eb413b0..a98aa66 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -358,7 +358,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   @Override
   public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
-    ((${minor.class}Vector) from).data.getBytes(fromIndex * VALUE_WIDTH, data, toIndex * VALUE_WIDTH, VALUE_WIDTH);
+    copyFromSafe(fromIndex, toIndex, (${minor.class}Vector) from);
   }
 
   public void decrementAllocationMonitor() {
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index ff066fb..f82c718 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -433,7 +433,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
     // Handle the case of not-nullable copied into a nullable
     if (from instanceof ${minor.class}Vector) {
-      bits.getMutator().set(toIndex,1);
+      bits.getMutator().setSafe(toIndex,1);
       values.copyFromSafe(fromIndex,toIndex,(${minor.class}Vector)from);
       return;
     }