You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/27 08:25:55 UTC

[GitHub] asfgit closed pull request #1344: DRILL-6461: Added basic data correctness tests for hash agg, and improved operator unit testing framework.

asfgit closed pull request #1344: DRILL-6461: Added basic data correctness tests for hash agg, and improved operator unit testing framework.
URL: https://github.com/apache/drill/pull/1344
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 2763f5903c3..2e343b657e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -137,23 +137,11 @@ public void close() {
 
   @Override
   public void buildSchema() throws SchemaChangeException {
-    VectorContainer c = new VectorContainer(oContext);
     IterOutcome outcome = next(incoming);
     switch (outcome) {
       case OK:
       case OK_NEW_SCHEMA:
         for (VectorWrapper<?> w : incoming) {
-          // TODO: Not sure why the special handling for AbstractContainerVector is needed since creation of child
-          // vectors is taken care correctly if the field is retrieved from incoming vector and passed to it rather than
-          // creating a new Field instance just based on name and type.
-          @SuppressWarnings("resource")
-          ValueVector v = c.addOrGet(w.getField());
-          if (v instanceof AbstractContainerVector) {
-            w.getValueVector().makeTransferPair(v);
-            v.clear();
-          }
-        }
-        for (VectorWrapper<?> w : c) {
           @SuppressWarnings("resource")
           ValueVector v = container.addOrGet(w.getField());
           if (v instanceof AbstractContainerVector) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 3c418b99c5d..958a0b6985d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -545,6 +545,15 @@ public boolean isEmpty() {
 
   @Override
   public void clear() {
+    clear(true);
+  }
+
+  private void clear(boolean close) {
+    if (close) {
+      // If we are closing, we need to clear the htContainerOrig as well.
+      htContainerOrig.clear();
+    }
+
     if (batchHolders != null) {
       for (BatchHolder bh : batchHolders) {
         bh.clear();
@@ -842,7 +851,7 @@ public void enlargeEmptyHashTableIfNeeded(int newNum) {
    *
    */
   public void reset() {
-    this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
+    this.clear(false); // Clear all current batch holders and hash table (i.e. free their memory)
 
     freeIndex = 0; // all batch holders are gone
     // reallocate batch holders, and the hash table to the original size
@@ -852,6 +861,7 @@ public void reset() {
     totalIndexSize = 0;
     startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
   }
+
   public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) {
     incomingBuild = newIncoming;
     incomingProbe = newIncomingProbe;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index dd933250a2b..b459e1c4de7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -329,6 +329,8 @@ private void setValueCount(final int count) {
       m.setValueCount(count);
     }
 
+    container.setRecordCount(count);
+
     if (complexWriters == null) {
       return;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
index 47ec1cb14b9..a4635193002 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
@@ -19,7 +19,7 @@
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
@@ -30,7 +30,7 @@
   protected VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -43,15 +43,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) {
 
   @Override
   public int copyRecords(int index, int recordCount) {
-    for(VectorWrapper<?> out : outgoing){
-      TypeProtos.MajorType type = out.getField().getType();
-      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
-        out.getValueVector().allocateNew();
-      } else {
-        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
-      }
-    }
-
+    allocateOutgoing(outgoing, recordCount);
     return insertRecords(0, index, recordCount);
   }
 
@@ -91,4 +83,16 @@ protected void updateCounts(int numRecords) {
   public abstract void copyEntryIndirect(int inIndex, int outIndex);
 
   public abstract void copyEntry(int inIndex, int outIndex);
+
+  public static void allocateOutgoing(VectorContainer outgoing, int recordCount) {
+    for(VectorWrapper<?> out : outgoing) {
+      TypeProtos.MajorType type = out.getField().getType();
+
+      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+        out.getValueVector().allocateNew();
+      } else {
+        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index 68a088917b2..d273fd301c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -33,7 +33,7 @@
   protected List<TransferPair> transferPairs = new ArrayList<>();
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv2 = incoming.getSelectionVector2();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index 56e258659e0..970f970449f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -30,7 +30,7 @@
   private SelectionVector4 sv4;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv4 = incoming.getSelectionVector4();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 92dea7021d2..f8934e5a3d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorAccessible;
 
 public interface Copier {
-  void setup(RecordBatch incoming, VectorContainer outgoing);
+  void setup(VectorAccessible incoming, VectorContainer outgoing);
   int copyRecords(int index, int recordCount);
   int appendRecord(int index);
   int appendRecords(int index, int recordCount);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
index 72516e058ef..f64a11e7c54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -17,11 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import static org.apache.drill.exec.physical.impl.svremover.AbstractCopier.allocateOutgoing;
+
 public class GenericCopier implements Copier {
   private ValueVector[] vvOut;
   private ValueVector[] vvIn;
@@ -29,7 +31,7 @@
   private VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) {
+  public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -53,6 +55,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) {
 
   @Override
   public int copyRecords(int index, int recordCount) {
+    allocateOutgoing(outgoing, recordCount);
     return insertRecords(0, index, recordCount);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
index 33f2a964b98..cecfc5af793 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
@@ -20,6 +20,7 @@
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -39,7 +40,7 @@ public StraightCopier(RecordBatch incomingBatch, VectorContainer outputContainer
     }
 
     @Override
-    public void setup(RecordBatch incoming, VectorContainer outgoing) {
+    public void setup(VectorAccessible incoming, VectorContainer outgoing) {
       for(VectorWrapper<?> vv : incoming){
         TransferPair tp = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
         pairs.add(tp);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 83287eead83..dac80a58c51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.record;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.Map;
 
@@ -633,6 +635,11 @@ public ColumnSize getColumn(String name) {
   // columns can be obtained from children of topColumns.
   private Map<String, ColumnSize> columnSizes = CaseInsensitiveMap.newHashMap();
 
+  /**
+   * This field is used by the convenience method {@link #columnsList()}.
+   */
+  private List<ColumnSize> columnSizesList = new ArrayList<>();
+
   /**
    * Number of records (rows) in the batch.
    */
@@ -715,6 +722,8 @@ public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     for (VectorWrapper<?> vw : va) {
       ColumnSize colSize = measureColumn(vw.getValueVector(), "");
       columnSizes.put(vw.getField().getName(), colSize);
+      columnSizesList.add(colSize);
+      stdRowWidth += colSize.getStdDataSizePerEntry();
       netBatchSize += colSize.getTotalNetSize();
       maxSize = Math.max(maxSize, colSize.getTotalDataSize());
       if (colSize.metadata.isNullable()) {
@@ -884,6 +893,14 @@ public int getAvgDensity() {
   public int getNetRowWidth() { return netRowWidth; }
   public Map<String, ColumnSize> columns() { return columnSizes; }
 
+  /**
+   * This is a convenience method to get the sizes of columns in the same order that the corresponding value vectors
+   * are stored within a {@link org.apache.drill.exec.record.VectorAccessible}.
+   * @return The sizes of columns in the same order that the corresponding value vectors are stored within a
+   * {@link org.apache.drill.exec.record.VectorAccessible}.
+   */
+  public List<ColumnSize> columnsList() { return columnSizesList; }
+
   /**
    * Compute the "real" width of the row, taking into account each varchar column size
    * (historically capped at 50, and rounded up to power of 2 to match drill buf allocation)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index 4eaca2bf063..620a61c17cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -19,7 +19,7 @@
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index ed7af4cbee8..f3ec7b03279 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -29,9 +30,17 @@
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
 
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 public class MockRecordBatch implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordBatch.class);
@@ -39,41 +48,83 @@
   // These resources are owned by this RecordBatch
   protected VectorContainer container;
   protected SelectionVector2 sv2;
+  protected SelectionVector4 sv4;
   private int currentContainerIndex;
   private int currentOutcomeIndex;
   private boolean isDone;
   private boolean limitWithUnnest;
 
   // All the below resources are owned by caller
-  private final List<VectorContainer> allTestContainers;
-  private List<SelectionVector2> allTestContainersSv2;
+  private final List<RowSet> rowSets;
   private final List<IterOutcome> allOutcomes;
   private final FragmentContext context;
   protected final OperatorContext oContext;
   protected final BufferAllocator allocator;
 
-  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
-                         List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
-                         BatchSchema schema) {
+  private MockRecordBatch(@NotNull final FragmentContext context,
+                          @Nullable final OperatorContext oContext,
+                          @NotNull final List<RowSet> testRowSets,
+                          @NotNull final List<IterOutcome> iterOutcomes,
+                          @NotNull final BatchSchema schema,
+                          final boolean dummy) {
+    Preconditions.checkNotNull(testRowSets);
+    Preconditions.checkNotNull(iterOutcomes);
+    Preconditions.checkNotNull(schema);
+
     this.context = context;
     this.oContext = oContext;
-    this.allocator = oContext.getAllocator();
-    this.allTestContainers = testContainers;
+    this.rowSets = testRowSets;
+    this.allocator = context.getAllocator();
     this.container = new VectorContainer(allocator, schema);
     this.allOutcomes = iterOutcomes;
     this.currentContainerIndex = 0;
     this.currentOutcomeIndex = 0;
     this.isDone = false;
-    this.allTestContainersSv2 = null;
-    this.sv2 = null;
   }
 
-  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
-                         List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
-                         List<SelectionVector2> testContainersSv2, BatchSchema schema) {
-    this(context, oContext, testContainers, iterOutcomes, schema);
-    allTestContainersSv2 = testContainersSv2;
-    sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? new SelectionVector2(allocator) : null;
+  @Deprecated
+  public MockRecordBatch(@Nullable final FragmentContext context,
+                         @Nullable final OperatorContext oContext,
+                         @NotNull final List<VectorContainer> testContainers,
+                         @NotNull final List<IterOutcome> iterOutcomes,
+                         final BatchSchema schema) {
+    this(context,
+         oContext,
+         testContainers.stream().
+           map(container -> DirectRowSet.fromContainer(container)).
+           collect(Collectors.toList()),
+         iterOutcomes,
+         schema,
+         true);
+  }
+
+  @Deprecated
+  public MockRecordBatch(@Nullable final FragmentContext context,
+                         @Nullable final OperatorContext oContext,
+                         @NotNull final List<VectorContainer> testContainers,
+                         @NotNull final List<IterOutcome> iterOutcomes,
+                         @NotNull final List<SelectionVector2> selectionVector2s,
+                         final BatchSchema schema) {
+    this(context,
+      oContext,
+      new Supplier<List<RowSet>>() {
+        @Override
+        public List<RowSet> get() {
+          List<RowSet> rowSets = new ArrayList<>();
+
+          for (int index = 0; index < testContainers.size(); index++) {
+            if (index >= selectionVector2s.size()) {
+              rowSets.add(IndirectRowSet.fromContainer(testContainers.get(index)));
+            } else {
+              rowSets.add(IndirectRowSet.fromSv2(testContainers.get(index), selectionVector2s.get(index)));
+            }
+          }
+          return rowSets;
+        }
+      }.get(),
+      iterOutcomes,
+      schema,
+      true);
   }
 
   @Override
@@ -94,7 +145,7 @@ public SelectionVector2 getSelectionVector2() {
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return null;
+    return sv4;
   }
 
   @Override
@@ -146,10 +197,11 @@ public IterOutcome next() {
       return IterOutcome.NONE;
     }
 
-    IterOutcome currentOutcome = IterOutcome.OK;
+    IterOutcome currentOutcome;
 
-    if (currentContainerIndex < allTestContainers.size()) {
-      final VectorContainer input = allTestContainers.get(currentContainerIndex);
+    if (currentContainerIndex < rowSets.size()) {
+      final RowSet rowSet = rowSets.get(currentContainerIndex);
+      final VectorContainer input = rowSet.container();
       final int recordCount = input.getRecordCount();
       // We need to do this since the downstream operator expects vector reference to be same
       // after first next call in cases when schema is not changed
@@ -158,19 +210,34 @@ public IterOutcome next() {
         container.clear();
         container = new VectorContainer(allocator, inputSchema);
       }
-      container.transferIn(input);
-      container.setRecordCount(recordCount);
-
-      // Transfer the sv2 as well
-      final SelectionVector2 inputSv2 =
-        (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
-          ? allTestContainersSv2.get(currentContainerIndex) : null;
-      if (inputSv2 != null) {
-        sv2.allocateNewSafe(inputSv2.getCount());
-        for (int i=0; i<inputSv2.getCount(); ++i) {
-          sv2.setIndex(i, inputSv2.getIndex(i));
-        }
-        sv2.setRecordCount(inputSv2.getCount());
+
+      switch (rowSet.indirectionType()) {
+        case NONE:
+        case TWO_BYTE:
+          container.transferIn(input);
+          container.setRecordCount(recordCount);
+          final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
+
+          if (sv2 != null) {
+            // Operators assume that new values for an Sv2 are transferred in.
+            sv2.allocateNewSafe(inputSv2.getCount());
+            for (int i=0; i<inputSv2.getCount(); ++i) {
+              sv2.setIndex(i, inputSv2.getIndex(i));
+            }
+            sv2.setRecordCount(inputSv2.getCount());
+          } else {
+            sv2 = inputSv2;
+          }
+
+          break;
+        case FOUR_BYTE:
+          // TODO find a clean way to transfer in for this case.
+          container.clear();
+          container = input;
+          sv4 = ((RowSet.HyperRowSet) rowSet).getSv4();
+          break;
+        default:
+          throw new UnsupportedOperationException();
       }
     }
 
@@ -222,4 +289,69 @@ public boolean isCompleted() {
   public void useUnnestKillHandlingForLimit(boolean limitWithUnnest) {
     this.limitWithUnnest = limitWithUnnest;
   }
+
+  public static class Builder {
+    private final List<RowSet> rowSets = new ArrayList<>();
+    private final List<IterOutcome> iterOutcomes = new ArrayList<>();
+
+    private BatchSchema batchSchema;
+    private OperatorContext oContext;
+
+    public Builder() {
+    }
+
+    private Builder sendData(final RowSet rowSet, final IterOutcome outcome) {
+      Preconditions.checkState(batchSchema == null);
+      rowSets.add(rowSet);
+      iterOutcomes.add(outcome);
+      return this;
+    }
+
+    public Builder sendData(final RowSet rowSet) {
+      final IterOutcome outcome = rowSets.isEmpty()? IterOutcome.OK_NEW_SCHEMA: IterOutcome.OK;
+      return sendData(rowSet, outcome);
+    }
+
+    public Builder sendDataWithNewSchema(final RowSet rowSet) {
+      return sendData(rowSet, IterOutcome.OK_NEW_SCHEMA);
+    }
+
+    public Builder sendDataAndEmit(final RowSet rowSet) {
+      return sendData(rowSet, IterOutcome.EMIT);
+    }
+
+    public Builder terminateWithError(IterOutcome errorOutcome) {
+      Preconditions.checkArgument(errorOutcome != IterOutcome.STOP);
+      Preconditions.checkArgument(errorOutcome != IterOutcome.OUT_OF_MEMORY);
+
+      iterOutcomes.add(errorOutcome);
+      return this;
+    }
+
+    public Builder setSchema(final BatchSchema batchSchema) {
+      Preconditions.checkState(!rowSets.isEmpty());
+      this.batchSchema = Preconditions.checkNotNull(batchSchema);
+      return this;
+    }
+
+    public Builder withOperatorContext(final OperatorContext oContext) {
+      this.oContext = Preconditions.checkNotNull(oContext);
+      return this;
+    }
+
+    public MockRecordBatch build(final FragmentContext context) {
+      BatchSchema tempSchema = batchSchema;
+
+      if (tempSchema == null && !rowSets.isEmpty()) {
+        tempSchema = rowSets.get(0).batchSchema();
+      }
+
+      return new MockRecordBatch(context,
+        oContext,
+        rowSets,
+        iterOutcomes,
+        tempSchema,
+        true);
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
index 37c0b52e90c..9909bca7e27 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
@@ -20,7 +20,7 @@
 
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.OperatorTest;
@@ -59,7 +59,7 @@ public void testStreamAggWithGroupBy() {
               "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.13987654321\"," +
               " \"a\": [{\"b\":70, \"c\":85}, {\"b\":90, \"c\":145}], \"m\": [{\"n\": [7, 8, 9]}], \"f\": [{\"g\": {\"h\": [{\"k\": 50}, {\"k\": 60}]}}]," +
               "\"p\": {\"q\": [33, 34, 35]}" + "}]");
-      opTestBuilder()
+      legacyOpTestBuilder()
           .physicalOperator(aggConf)
           .inputDataStreamJson(inputJsonBatches)
           .baselineColumns("age", "any_a")
@@ -146,4 +146,4 @@ public void testRepeatedDecimalWithGroupBy() throws Exception {
           .go();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java
new file mode 100644
index 00000000000..2c6976cf092
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.agg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.ExecConstants.HASHAGG_NUM_PARTITIONS_KEY;
+
+public class TestHashAggBatch extends PhysicalOpUnitTestBase {
+  public static final String FIRST_NAME_COL = "firstname";
+  public static final String LAST_NAME_COL = "lastname";
+  public static final String STUFF_COL = "stuff";
+  public static final String TOTAL_STUFF_COL = "totalstuff";
+
+  public static final List<String> FIRST_NAMES = ImmutableList.of(
+    "Strawberry",
+    "Banana",
+    "Mango",
+    "Grape");
+
+  public static final List<String> LAST_NAMES = ImmutableList.of(
+    "Red",
+    "Green",
+    "Blue",
+    "Purple");
+
+  public static final TupleMetadata INT_OUTPUT_SCHEMA = new SchemaBuilder()
+    .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+    .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+    .add(TOTAL_STUFF_COL, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL)
+    .buildSchema();
+
+  // TODO remove this in order to test multiple partitions
+  @Before
+  public void setupSimpleSingleBatchSumTestPhase1of2() {
+    operatorFixture.getOptionManager().setLocalOption(HASHAGG_NUM_PARTITIONS_KEY, 1);
+  }
+
+  @Test
+  public void simpleSingleBatchSumTestPhase1of2() throws Exception {
+    batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_1of2);
+  }
+
+  @Test
+  public void simpleMultiBatchSumTestPhase1of2() throws Exception {
+    batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_1of2);
+  }
+
+  @Test
+  public void simpleSingleBatchSumTestPhase1of1() throws Exception {
+    batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_1of1);
+  }
+
+  @Test
+  public void simpleMultiBatchSumTestPhase1of1() throws Exception {
+    batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_1of1);
+  }
+
+  @Test
+  public void simpleSingleBatchSumTestPhase2of2() throws Exception {
+    batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_2of2);
+  }
+
+  @Test
+  public void simpleMultiBatchSumTestPhase2of2() throws Exception {
+    batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_2of2);
+  }
+
+  private void batchSumTest(int totalCount, int maxInputBatchSize, AggPrelBase.OperatorPhase phase) throws Exception {
+    final HashAggregate hashAggregate = createHashAggPhysicalOperator(phase);
+    final List<RowSet> inputRowSets = buildInputRowSets(TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED,
+      totalCount, maxInputBatchSize);
+
+    final MockRecordBatch.Builder rowSetBatchBuilder = new MockRecordBatch.Builder();
+    inputRowSets.forEach(rowSet -> rowSetBatchBuilder.sendData(rowSet));
+    final MockRecordBatch inputRowSetBatch = rowSetBatchBuilder.build(fragContext);
+
+    final RowSet expectedRowSet = buildIntExpectedRowSet(totalCount);
+
+    opTestBuilder()
+      .physicalOperator(hashAggregate)
+      .combineOutputBatches()
+      .unordered()
+      .addUpstreamBatch(inputRowSetBatch)
+      .addExpectedResult(expectedRowSet)
+      .go();
+  }
+
+  private HashAggregate createHashAggPhysicalOperator(AggPrelBase.OperatorPhase phase) {
+    final List<NamedExpression> keyExpressions = Lists.newArrayList(
+      new NamedExpression(SchemaPath.getSimplePath(FIRST_NAME_COL), new FieldReference(FIRST_NAME_COL)),
+      new NamedExpression(SchemaPath.getSimplePath(LAST_NAME_COL), new FieldReference(LAST_NAME_COL)));
+
+    final List<NamedExpression> aggExpressions = Lists.newArrayList(
+      new NamedExpression(
+        new FunctionCall("sum", ImmutableList.of(SchemaPath.getSimplePath(STUFF_COL)),
+          new ExpressionPosition(null, 0)),
+        new FieldReference(TOTAL_STUFF_COL)));
+
+    return new HashAggregate(
+      null,
+      phase,
+      keyExpressions,
+      aggExpressions,
+      0.0f);
+  }
+
+  private TupleMetadata buildInputSchema(TypeProtos.MinorType minorType, TypeProtos.DataMode dataMode) {
+    return new SchemaBuilder()
+      .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add(STUFF_COL, minorType, dataMode)
+      .buildSchema();
+  }
+
+  private List<RowSet> buildInputRowSets(final TypeProtos.MinorType minorType,
+                                  final TypeProtos.DataMode dataMode,
+                                  final int dataCount,
+                                  final int maxBatchSize) {
+    Preconditions.checkArgument(dataCount > 0);
+    Preconditions.checkArgument(maxBatchSize > 0);
+
+    List<RowSet> inputRowSets = new ArrayList<>();
+    int currentBatchSize = 0;
+    RowSetBuilder inputRowSetBuilder = null;
+
+    for (int multiplier = 1, firstNameIndex = 0; firstNameIndex < FIRST_NAMES.size(); firstNameIndex++) {
+      final String firstName = FIRST_NAMES.get(firstNameIndex);
+
+      for (int lastNameIndex = 0; lastNameIndex < LAST_NAMES.size(); lastNameIndex++, multiplier++) {
+        final String lastName = LAST_NAMES.get(lastNameIndex);
+
+        for (int index = 1; index <= dataCount; index++) {
+          final int num = index * multiplier;
+
+          if (currentBatchSize == 0) {
+            final TupleMetadata inputSchema = buildInputSchema(minorType, dataMode);
+            inputRowSetBuilder = new RowSetBuilder(operatorFixture.allocator(), inputSchema);
+          }
+
+          inputRowSetBuilder.addRow(firstName, lastName, num);
+          currentBatchSize++;
+
+          if (currentBatchSize == maxBatchSize) {
+            final RowSet rowSet = inputRowSetBuilder.build();
+            inputRowSets.add(rowSet);
+            currentBatchSize = 0;
+          }
+        }
+      }
+    }
+
+    if (currentBatchSize != 0) {
+      inputRowSets.add(inputRowSetBuilder.build());
+    }
+
+    return inputRowSets;
+  }
+
+  private RowSet buildIntExpectedRowSet(final int dataCount) {
+    final RowSetBuilder expectedRowSetBuilder = new RowSetBuilder(operatorFixture.allocator(), INT_OUTPUT_SCHEMA);
+
+    for (int multiplier = 1, firstNameIndex = 0; firstNameIndex < FIRST_NAMES.size(); firstNameIndex++) {
+      final String firstName = FIRST_NAMES.get(firstNameIndex);
+
+      for (int lastNameIndex = 0; lastNameIndex < LAST_NAMES.size(); lastNameIndex++, multiplier++) {
+        final String lastName = LAST_NAMES.get(lastNameIndex);
+        final long total = ((dataCount * (dataCount + 1)) / 2) * multiplier;
+
+        expectedRowSetBuilder.addRow(firstName, lastName, total);
+      }
+    }
+
+    return expectedRowSetBuilder.build();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index 48fd856c330..bbe57faabef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -32,6 +32,7 @@
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
 import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl;
@@ -41,13 +42,13 @@
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetBatch;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.Assert;
@@ -68,27 +69,31 @@ public void noSpillBuildSideTest() throws Exception
       private RowSet probeRowSet;
 
       @Override
-      public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
-        buildRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context) {
+        buildRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(1, "green")
           .addRow(3, "red")
           .addRow(2, "blue")
           .build();
-        return new RowSetBatch(buildRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(buildRowSet).
+          build(context);
       }
 
       @Override
-      public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+      public void createResultBuildBatch(BatchSchema schema, FragmentContext context) {
       }
 
       @Override
-      public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
-        probeRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context) {
+        probeRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(.5, "yellow")
           .addRow(1.5, "blue")
           .addRow(2.5, "black")
           .build();
-        return new RowSetBatch(probeRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(probeRowSet).
+          build(context);
       }
 
       @Override
@@ -114,9 +119,9 @@ public void run(SpillSet spillSet,
 
         final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
 
-        hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 0, 10, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 1, 11, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 2, 12, noopCalc);
         hashPartition.completeAnInnerBatch(false, false);
         hashPartition.buildContainersHashTableAndHelper();
 
@@ -155,22 +160,24 @@ public void spillSingleIncompleteBatchBuildSideTest() throws Exception
       private RowSet actualBuildRowSet;
 
       @Override
-      public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
-        buildRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context) {
+        buildRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(1, "green")
           .addRow(3, "red")
           .addRow(2, "blue")
           .build();
-        return new RowSetBatch(buildRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(buildRowSet).
+          build(context);
       }
 
       @Override
-      public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+      public void createResultBuildBatch(BatchSchema schema, FragmentContext context) {
         final BatchSchema newSchema = BatchSchema.newBuilder()
           .addFields(schema)
           .addField(MaterializedField.create(HashPartition.HASH_VALUE_COLUMN_NAME, HashPartition.HVtype))
           .build();
-        actualBuildRowSet = new RowSetBuilder(allocator, newSchema)
+        actualBuildRowSet = new RowSetBuilder(context.getAllocator(), newSchema)
           .addRow(1, "green", 10)
           .addRow(3, "red", 11)
           .addRow(2, "blue", 12)
@@ -178,13 +185,15 @@ public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator
       }
 
       @Override
-      public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
-        probeRowSet = new RowSetBuilder(allocator, schema)
+      public CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context) {
+        probeRowSet = new RowSetBuilder(context.getAllocator(), schema)
           .addRow(.5, "yellow")
           .addRow(1.5, "blue")
           .addRow(2.5, "black")
           .build();
-        return new RowSetBatch(probeRowSet);
+        return new MockRecordBatch.Builder().
+          sendData(probeRowSet).
+          build(context);
       }
 
       @Override
@@ -210,9 +219,9 @@ public void run(SpillSet spillSet,
 
         final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
 
-        hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
-        hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 0, 10, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 1, 11, noopCalc);
+        hashPartition.appendInnerRow(buildBatch.getContainer(), 2, 12, noopCalc);
         hashPartition.completeAnInnerBatch(false, false);
         hashPartition.spillThisPartition();
         final String spillFile = hashPartition.getSpillFile();
@@ -260,15 +269,17 @@ public void run(HashPartitionTestCase testCase) throws Exception {
         MaterializedField buildColB = MaterializedField.create("buildColB", Types.required(TypeProtos.MinorType.VARCHAR));
         List<MaterializedField> buildCols = Lists.newArrayList(buildColA, buildColB);
         final BatchSchema buildSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, buildCols);
-        final RecordBatch buildBatch = testCase.createBuildBatch(buildSchema, allocator);
-        testCase.createResultBuildBatch(buildSchema, allocator);
+        final CloseableRecordBatch buildBatch = testCase.createBuildBatch(buildSchema, operatorContext.getFragmentContext());
+        buildBatch.next();
+        testCase.createResultBuildBatch(buildSchema, operatorContext.getFragmentContext());
 
         // Create probe batch
         MaterializedField probeColA = MaterializedField.create("probeColA", Types.required(TypeProtos.MinorType.FLOAT4));
         MaterializedField probeColB = MaterializedField.create("probeColB", Types.required(TypeProtos.MinorType.VARCHAR));
         List<MaterializedField> probeCols = Lists.newArrayList(probeColA, probeColB);
         final BatchSchema probeSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, probeCols);
-        final RecordBatch probeBatch = testCase.createProbeBatch(probeSchema, allocator);
+        final CloseableRecordBatch probeBatch = testCase.createProbeBatch(probeSchema, operatorContext.getFragmentContext());
+        probeBatch.next();
 
         final LogicalExpression buildColExpression = SchemaPath.getSimplePath(buildColB.getName());
         final LogicalExpression probeColExpression = SchemaPath.getSimplePath(probeColB.getName());
@@ -285,14 +296,17 @@ public void run(HashPartitionTestCase testCase) throws Exception {
         baseHashTable.updateIncoming(buildBatch, probeBatch);
 
         testCase.run(spillSet, buildSchema, probeSchema, buildBatch, probeBatch, baseHashTable, context, operatorContext);
+
+        buildBatch.close();
+        probeBatch.close();
       }
     }
   }
 
   interface HashPartitionTestCase {
-    RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator);
-    void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator);
-    RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator);
+    CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context);
+    void createResultBuildBatch(BatchSchema schema, FragmentContext context);
+    CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context);
 
     void run(SpillSet spillSet,
              BatchSchema buildSchema,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
index aae566badc4..d3d148787e4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
@@ -22,10 +22,10 @@
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.work.filter.BloomFilter;
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -58,7 +58,7 @@ public void testBroadcastHashJoin1Cond() {
     for ( int cnt = 1; cnt <= numRows; cnt++ ) {
       leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
     }
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
@@ -91,7 +91,7 @@ public void testBroadcastHashJoin2Cond() {
     for ( int cnt = 1; cnt <= numRows; cnt++ ) {
       leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
     }
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
index 0c08611e170..67d214f8b92 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -23,7 +23,7 @@
 import org.apache.drill.categories.SlowTest;
 
 import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -52,7 +52,7 @@ public void testSimpleHashJoinSpill() {
       rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
     }
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
@@ -79,7 +79,7 @@ public void testRightOuterHashJoinSpill() {
       rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
     }
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
@@ -110,7 +110,7 @@ public void testLeftOuterHashJoinSpill() {
       // rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
     }
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(joinConf)
       .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
       .baselineColumns("lft", "a", "b", "rgt")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
index 7225edcb893..3dbd1cca937 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
@@ -19,7 +19,7 @@
 
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
@@ -55,7 +55,7 @@ public void testLimitMoreRecords() {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -71,7 +71,7 @@ public void testLimitLessRecords() {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -85,7 +85,7 @@ public void testLimitWithOffset() {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -99,7 +99,7 @@ public void testLimitWithNoLastRecord() {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -114,7 +114,7 @@ public void testLimitWithNegativeOffset() {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -130,7 +130,7 @@ public void testLimitWithNegativeFirstLast() {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -144,7 +144,7 @@ public void testLimitWithOffsetOutOfRange() {
     List<String> inputJsonBatches = Lists.newArrayList(
       "[{\"a\": 5, \"b\" : 1 }]",
       "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(limitConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
index 7d444b4ed50..292af20c11f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
@@ -20,77 +20,99 @@
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetBatch;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Rule;
 import org.junit.Test;
 
 public abstract class AbstractGenericCopierTest {
+  @Rule
+  public final BaseDirTestWatcher baseDirTestWatcher = new BaseDirTestWatcher();
+
   @Test
-  public void testCopyRecords() throws SchemaChangeException {
-    try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+  public void testCopyRecords() throws Exception {
+    try (OperatorFixture operatorFixture = new OperatorFixture.Builder(baseDirTestWatcher).build()) {
+      final BufferAllocator allocator = operatorFixture.allocator();
+      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
-      final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
-      final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
+      final VectorContainer destContainer = new VectorContainer(allocator, batchSchema);
+
+      destContainer.setRecordCount(0);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.copyRecords(0, 3);
+      MockRecordBatch mockRecordBatch = null;
 
       try {
-        new RowSetComparison(expectedRowSet).verify(destRowSet);
+        mockRecordBatch = new MockRecordBatch.Builder().
+          sendData(srcRowSet).
+          build(operatorFixture.getFragmentContext());
+        mockRecordBatch.next();
+        final Copier copier = createCopier(mockRecordBatch, destContainer, null);
+        copier.copyRecords(0, 3);
+
+        new RowSetComparison(expectedRowSet).verify(DirectRowSet.fromContainer(destContainer));
       } finally {
-        srcRowSet.clear();
-
-        if (srcRowSet instanceof RowSet.HyperRowSet) {
-          ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+        if (mockRecordBatch != null) {
+          mockRecordBatch.close();
         }
 
-        destRowSet.clear();
+        srcRowSet.clear();
+        destContainer.clear();
         expectedRowSet.clear();
       }
     }
   }
 
   @Test
-  public void testAppendRecords() throws SchemaChangeException {
-    try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+  public void testAppendRecords() throws Exception {
+    try (OperatorFixture operatorFixture = new OperatorFixture.Builder(baseDirTestWatcher).build()) {
+      final BufferAllocator allocator = operatorFixture.allocator();
+      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
-      final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
-      final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
+      final VectorContainer destContainer = new VectorContainer(allocator, batchSchema);
+
+      AbstractCopier.allocateOutgoing(destContainer, 3);
+
+      destContainer.setRecordCount(0);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.appendRecord(0);
-      copier.appendRecords(1, 2);
+      MockRecordBatch mockRecordBatch = null;
 
       try {
-        new RowSetComparison(expectedRowSet).verify(destRowSet);
+        mockRecordBatch = new MockRecordBatch.Builder().
+          sendData(srcRowSet).
+          build(operatorFixture.getFragmentContext());
+        mockRecordBatch.next();
+        final Copier copier = createCopier(mockRecordBatch, destContainer, null);
+        copier.appendRecord(0);
+        copier.appendRecords(1, 2);
+
+        new RowSetComparison(expectedRowSet).verify(DirectRowSet.fromContainer(destContainer));
       } finally {
-        srcRowSet.clear();
-
-        if (srcRowSet instanceof RowSet.HyperRowSet) {
-          ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+        if (mockRecordBatch != null) {
+          mockRecordBatch.close();
         }
 
-        destRowSet.clear();
+        srcRowSet.clear();
+        destContainer.clear();
         expectedRowSet.clear();
       }
     }
   }
 
-  public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException;
+  public abstract RowSet createSrcRowSet(BufferAllocator allocator) throws SchemaChangeException;
 
   public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer,
                                       SchemaChangeCallBack callback) {
@@ -117,7 +139,7 @@ public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer
     return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}};
   }
 
-  private RowSet createExpectedRowset(RootAllocator allocator) {
+  public RowSet createExpectedRowset(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
       .addRow(row1())
       .addRow(row2())
@@ -125,7 +147,7 @@ private RowSet createExpectedRowset(RootAllocator allocator) {
       .build();
   }
 
-  protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) {
+  protected BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) {
     MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT));
     MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR));
     MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4));
@@ -136,6 +158,6 @@ protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) {
       .add(colC)
       .add(colD)
       .withSVMode(mode)
-      .buildSchema();
+      .build();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
index d6c38e72482..2636490f985 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
@@ -27,7 +27,7 @@
 
 public class GenericCopierTest extends AbstractGenericCopierTest {
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) {
+  public RowSet createSrcRowSet(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
       .addRow(row1())
       .addRow(row2())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
index 748e0d06b61..2fec0e5ffc8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
@@ -29,7 +29,7 @@
 public class GenericSV2BatchCopierTest extends AbstractGenericCopierTest {
 
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) {
+  public RowSet createSrcRowSet(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
       .addSelection(true, row1())
       .addRow(row2())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
index b2f0e5105bc..42182e974ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
@@ -25,7 +25,7 @@
 public class GenericSV2CopierTest extends AbstractGenericCopierTest {
 
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) {
+  public RowSet createSrcRowSet(BufferAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
       .addRow(row1())
       .addSelection(false, row4())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
index a5f5bb7852c..46edab7017f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
@@ -19,11 +19,10 @@
 
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.RowSet;
@@ -32,8 +31,8 @@
 public class GenericSV4CopierTest extends AbstractGenericCopierTest {
 
   @Override
-  public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException {
-    final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+  public RowSet createSrcRowSet(BufferAllocator allocator) throws SchemaChangeException {
+    final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
     final DrillBuf drillBuf = allocator.buffer(4 * 3);
     final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE);
 
@@ -52,10 +51,12 @@ public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeExcept
 
     final ExpandableHyperContainer hyperContainer = new ExpandableHyperContainer(batch1);
     hyperContainer.addBatch(batch2);
+    hyperContainer.setRecordCount(5);
 
     sv4.set(0, 0, 0);
     sv4.set(1, 1, 0);
     sv4.set(2, 1, 2);
+    sv4.setCount(3);
 
     return new HyperRowSetImpl(hyperContainer, sv4);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index 0a8cd88fd43..363a9393dff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -38,6 +38,8 @@
 import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -51,7 +53,7 @@ public void testSimpleProject() {
     List<String> jsonBatches = Lists.newArrayList(
         "[{\"x\": 5 },{\"x\": 10 }]",
         "[{\"x\": 20 },{\"x\": 30 },{\"x\": 40 }]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(projectConf)
         .inputDataStreamJson(jsonBatches)
         .baselineColumns("x")
@@ -69,7 +71,7 @@ public void testProjectComplexOutput() {
     List<String> jsonBatches = Lists.newArrayList(
         "[{\"json_col\": \"{ \\\"a\\\" : 1 }\"}]",
         "[{\"json_col\": \"{ \\\"a\\\" : 5 }\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(projectConf)
         .inputDataStreamJson(jsonBatches)
         .baselineColumns("complex_col")
@@ -91,7 +93,7 @@ public void testSimpleHashJoin() {
     List<String> rightJsonBatches = Lists.newArrayList(
         "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
         "[{\"x1\": 6, \"a2\" : \"qwerty\"},{\"x1\": 5, \"a2\" : \"12345\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(joinConf)
         .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
         .baselineColumns("x", "a", "a2", "x1")
@@ -116,7 +118,7 @@ public void testSimpleMergeJoin() {
     List<String> rightJsonBatches = Lists.newArrayList(
         "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
         "[{\"x1\": 5, \"a2\" : \"12345\"}, {\"x1\": 6, \"a2\" : \"qwerty\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(joinConf)
         .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
         .baselineColumns("x", "a", "a2", "x1")
@@ -135,7 +137,7 @@ public void testSimpleHashAgg() {
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(aggConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("b_sum", "a")
@@ -150,7 +152,7 @@ public void testSimpleStreamAgg() {
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(aggConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("b_sum", "a")
@@ -165,7 +167,7 @@ public void testComplexToJson() {
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": {\"b\" : 1 }}]",
         "[{\"a\": {\"b\" : 5}},{\"a\": {\"b\" : 8}}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(complexToJson)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("a")
@@ -182,7 +184,7 @@ public void testFilter() {
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
         "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(filterConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("a", "b")
@@ -206,7 +208,7 @@ public void testFlatten() {
       inputJsonBatches.add(batchString.toString());
     }
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(flatten)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns("a", "b")
@@ -225,7 +227,7 @@ public void testExternalSort() {
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
         "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(sortConf)
         .maxAllocation(15_000_000L)
         .inputDataStreamJson(inputJsonBatches)
@@ -253,8 +255,8 @@ private void externalSortLowMemoryHelper(int batchSize, int numberOfBatches, lon
       inputJsonBatches.add(batchString.toString());
     }
 
-    OperatorTestBuilder opTestBuilder =
-        opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder =
+        legacyOpTestBuilder()
             .initReservation(initReservation)
             .maxAllocation(maxAllocation)
             .physicalOperator(sortConf)
@@ -312,7 +314,7 @@ public void testTopN() {
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
         "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(sortConf)
         .inputDataStreamJson(inputJsonBatches)
         .baselineColumns("a", "b")
@@ -336,7 +338,7 @@ public void testSimpleMergingReceiver() {
     List<String> rightJsonBatches = Lists.newArrayList(
         "[{\"x\": 5, \"a\" : \"asdf\"}]",
         "[{\"x\": 5, \"a\" : \"12345\"}, {\"x\": 6, \"a\" : \"qwerty\"}]");
-    opTestBuilder()
+    legacyOpTestBuilder()
         .physicalOperator(mergeConf)
         .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
         .baselineColumns("x", "a")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index 79f260ff8d7..2f05bba9301 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -32,6 +32,7 @@
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -171,7 +172,7 @@ public void go() throws Exception {
   }
 
   /**
-   * Similar to {@link OperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
+   * Similar to {@link LegacyOperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
    * The input record batch could be a non-scan operator by calling {@link PopBuilder#addInputAsChild},
    * or a scan operator by calling {@link PopBuilder#addJsonScanAsChild()} if it's SCAN operator.
    *
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 84a4fbc0430..f63245548a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -49,6 +49,8 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -121,7 +123,7 @@ public void testProjectMap() throws Exception {
 
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -197,7 +199,7 @@ public void testProjectVariableWidthFunctions() throws  Exception {
 
       fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", memoryLimit);
 
-      OperatorTestBuilder opTestBuilder = opTestBuilder()
+      LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
               .physicalOperator(projectConf)
               .inputDataStreamJson(inputJsonBatches)
               .baselineColumns(baselineColumns)
@@ -276,7 +278,7 @@ public void testProjectFixedWidthImpl(boolean transfer, int columnCount) throws
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -347,7 +349,7 @@ public void testProjectVariableWidthImpl(boolean transfer, int columnCount, Stri
 
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -418,7 +420,7 @@ public void testProjectVariableWidthMixed() throws Exception {
 
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(projectConf)
             .inputDataStreamJson(inputJsonBatches)
             .baselineColumns(baselineColumns)
@@ -481,7 +483,7 @@ public void testFlattenFixedWidth() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -549,7 +551,7 @@ public void testFlattenVariableWidth() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -611,7 +613,7 @@ public void testFlattenFixedWidthList() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -672,7 +674,7 @@ public void testFlattenVariableWidthList() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -760,7 +762,7 @@ public void testFlattenMap() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -869,7 +871,7 @@ public void testFlattenListOfMaps() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -967,7 +969,7 @@ public void testFlattenNestedMap() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -1065,7 +1067,7 @@ public void testFlattenUpperLimit() throws Exception {
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     // Here we expect 16 batches because each batch will be limited by upper limit of 65535 records.
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "c")
@@ -1125,7 +1127,7 @@ public void testFlattenLowerLimit() throws Exception {
     // Here we expect 10 batches because each batch will be bounded by lower limit of at least 1 record.
     // do not check the output batch size as it will be more than configured value of 1024, so we get
     // at least one record out.
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "c")
@@ -1172,7 +1174,7 @@ public void testFlattenEmptyList() throws Exception {
     batchString.append("]");
     inputJsonBatches.add(batchString.toString());
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b")
@@ -1233,7 +1235,7 @@ public void testFlattenLargeRecords() throws Exception {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
@@ -1302,7 +1304,7 @@ public void testMergeJoinMultipleOutputBatches() throws Exception {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -1372,7 +1374,7 @@ public void testMergeJoinSingleOutputBatch() throws Exception {
     // We should get 1 batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(1)  // verify number of batches
@@ -1426,7 +1428,7 @@ public void testMergeJoinUpperLimit() throws Exception {
     // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
 
     // expect two batches, batch limited by 65535 records
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "c1", "a2", "c2")
       .expectedNumBatches(2)  // verify number of batches
@@ -1481,7 +1483,7 @@ public void testMergeJoinLowerLimit() throws Exception {
     // set very low value of output batch size so we can do only one row per batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(mergeJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(10)  // verify number of batches
@@ -1549,7 +1551,7 @@ public void testUnionOutputBatch() throws Exception {
     // We should get 2 batches, one for the left and one for the right.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(unionAll)
       .baselineColumns("a1", "b1", "c1")
       .expectedNumBatches(2)  // verify number of batches
@@ -1620,7 +1622,7 @@ public void testUnionMultipleOutputBatches() throws Exception {
     // We should get 4 batches, 2 for the left and 2 for the right.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(unionAll)
       .baselineColumns("a1", "b1", "c1")
       .expectedNumBatches(4)  // verify number of batches
@@ -1692,7 +1694,7 @@ public void testUnionLowerLimit() throws Exception {
     // We should get 22 batches for 22 rows.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(unionAll)
       .baselineColumns("a1","b1", "c1")
       .expectedNumBatches(22)  // verify number of batches
@@ -1764,7 +1766,7 @@ public void testHashJoinMultipleOutputBatches() throws Exception {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -1834,7 +1836,7 @@ public void testHashJoinSingleOutputBatch() throws Exception {
     // We should get 1 batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(1)  // verify number of batches
@@ -1888,7 +1890,7 @@ public void testHashJoinUpperLimit() throws Exception {
     // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
 
     // expect two batches, batch limited by 65535 records
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "c1", "a2", "c2")
       .expectedNumBatches(2)  // verify number of batches
@@ -1943,7 +1945,7 @@ public void testHashJoinLowerLimit() throws Exception {
     // set very low value of output batch size so we can do only one row per batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(10)  // verify number of batches
@@ -2011,7 +2013,7 @@ public void testRightOuterHashJoin() throws Exception {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -2080,7 +2082,7 @@ public void testLeftOuterHashJoin() throws Exception {
     // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashJoin)
       .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
       .expectedNumBatches(4)  // verify number of batches
@@ -2102,7 +2104,7 @@ public void testSimpleHashAgg() {
        "[{\"a\": 5, \"b\" : 1 }]",
          "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
 
-    opTestBuilder()
+    legacyOpTestBuilder()
       .physicalOperator(aggConf)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("b_sum", "a")
@@ -2157,7 +2159,7 @@ public void testHashAggSum() throws ExecutionSetupException {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashAgg)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b_sum")
@@ -2218,7 +2220,7 @@ public void testHashAggAvg() throws ExecutionSetupException {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashAgg)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b_avg")
@@ -2278,7 +2280,7 @@ public void testHashAggMax() throws ExecutionSetupException {
     // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
       .physicalOperator(hashAgg)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b_max")
@@ -2350,7 +2352,7 @@ public void testNestedLoopJoinMultipleOutputBatches() throws Exception {
     // We will get approximately 4 batches.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(4)  // verify number of batches
@@ -2424,7 +2426,7 @@ public void testNestedLoopJoinSingleOutputBatch() throws Exception {
     // We should get 1 batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(1)  // verify number of batches
@@ -2482,7 +2484,7 @@ public void testNestedLoopJoinUpperLimit() throws Exception {
 
     // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250
     // expect two batches, batch limited by 65535 records
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "c1", "a2", "c2")
             .expectedNumBatches(2)  // verify number of batches
@@ -2542,7 +2544,7 @@ public void testNestedLoopJoinLowerLimit() throws Exception {
     // set very low value of output batch size so we can do only one row per batch.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(10)  // verify number of batches
@@ -2612,7 +2614,7 @@ public void testLeftNestedLoopJoin() throws Exception {
     // We will get approximately 4 batches.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
 
-    OperatorTestBuilder opTestBuilder = opTestBuilder()
+    LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
             .physicalOperator(nestedLoopJoin)
             .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
             .expectedNumBatches(4)  // verify number of batches
@@ -2949,4 +2951,4 @@ public void testSizerRepeatedRepeatedList() throws Exception {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
index 9f4e0266206..4e0a7390a27 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
@@ -19,12 +19,12 @@
 
 import java.math.BigDecimal;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.ColumnMemoryUsageInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java
new file mode 100644
index 00000000000..dacb0612639
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * @deprecated Use {@link OperatorTestBuilder} instead.
+ */
+@Deprecated
+public class LegacyOperatorTestBuilder {
+
+  private PhysicalOpUnitTestBase physicalOpUnitTestBase;
+
+  private PhysicalOperator popConfig;
+  private String[] baselineColumns;
+  private List<Map<String, Object>> baselineRecords;
+  private List<List<String>> inputStreamsJSON;
+  private long initReservation = AbstractBase.INIT_ALLOCATION;
+  private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+  private boolean expectNoRows;
+  private Long expectedBatchSize;
+  private Integer expectedNumBatches;
+  private Integer expectedTotalRows;
+
+  public LegacyOperatorTestBuilder(PhysicalOpUnitTestBase physicalOpUnitTestBase) {
+    this.physicalOpUnitTestBase = physicalOpUnitTestBase;
+  }
+
+  @SuppressWarnings({"unchecked", "resource"})
+  public void go() {
+    BatchCreator<PhysicalOperator> opCreator;
+    RecordBatch testOperator;
+    try {
+      physicalOpUnitTestBase.mockOpContext(popConfig, initReservation, maxAllocation);
+
+      opCreator = (BatchCreator<PhysicalOperator>) physicalOpUnitTestBase.opCreatorReg.getOperatorCreator(popConfig.getClass());
+      List<RecordBatch> incomingStreams = Lists.newArrayList();
+      if (inputStreamsJSON != null) {
+        for (List<String> batchesJson : inputStreamsJSON) {
+          incomingStreams.add(new ScanBatch(popConfig, physicalOpUnitTestBase.fragContext,
+              physicalOpUnitTestBase.getReaderListForJsonBatches(batchesJson, physicalOpUnitTestBase.fragContext)));
+        }
+      }
+
+      testOperator = opCreator.getBatch(physicalOpUnitTestBase.fragContext, popConfig, incomingStreams);
+
+      Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new PhysicalOpUnitTestBase.BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows);
+      if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results
+
+      Map<String, List<Object>> expectedSuperVectors;
+
+      if (expectNoRows) {
+        expectedSuperVectors = new TreeMap<>();
+        for (String column : baselineColumns) {
+          expectedSuperVectors.put(column, new ArrayList<>());
+        }
+      } else {
+        expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+      }
+
+      DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+
+    } catch (ExecutionSetupException e) {
+      throw new RuntimeException(e);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public LegacyOperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+    this.popConfig = batch;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder initReservation(long initReservation) {
+    this.initReservation = initReservation;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder maxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
+    this.inputStreamsJSON = new ArrayList<>();
+    this.inputStreamsJSON.add(jsonBatches);
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
+    this.inputStreamsJSON = childStreams;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder baselineColumns(String... columns) {
+    for (int i = 0; i < columns.length; i++) {
+      LogicalExpression ex = physicalOpUnitTestBase.parseExpr(columns[i]);
+      if (ex instanceof SchemaPath) {
+        columns[i] = ((SchemaPath)ex).toExpr();
+      } else {
+        throw new IllegalStateException("Schema path is not a valid format.");
+      }
+    }
+    this.baselineColumns = columns;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder baselineValues(Object... baselineValues) {
+    if (baselineRecords == null) {
+      baselineRecords = new ArrayList<>();
+    }
+    Map<String, Object> ret = new HashMap<>();
+    int i = 0;
+    Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
+        "Must supply the same number of baseline values as columns.");
+    for (String s : baselineColumns) {
+      ret.put(s, baselineValues[i]);
+      i++;
+    }
+    this.baselineRecords.add(ret);
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectZeroRows() {
+    this.expectNoRows = true;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
+    this.expectedNumBatches = expectedNumBatches;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectedBatchSize(Long batchSize) {
+    this.expectedBatchSize = batchSize;
+    return this;
+  }
+
+  public LegacyOperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
+    this.expectedTotalRows = expectedTotalRows;
+    return this;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java
new file mode 100644
index 00000000000..746422ccda6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.svremover.Copier;
+import org.apache.drill.exec.physical.impl.svremover.GenericCopier;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class OperatorTestBuilder {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorTestBuilder.class);
+
+  private final List<RowSet> expectedResults = new ArrayList<>();
+  private final List<MockRecordBatch> upstreamBatches = new ArrayList<>();
+  private PhysicalOpUnitTestBase physicalOpUnitTestBase;
+
+  private PhysicalOperator physicalOperator;
+  private long initReservation = AbstractBase.INIT_ALLOCATION;
+  private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+  private Optional<Integer> expectedNumBatchesOpt = Optional.empty();
+  private Optional<Integer> expectedTotalRowsOpt = Optional.empty();
+  private boolean combineOutputBatches;
+  private boolean unordered;
+
+  public OperatorTestBuilder(PhysicalOpUnitTestBase physicalOpUnitTestBase) {
+    this.physicalOpUnitTestBase = physicalOpUnitTestBase;
+  }
+
+  @SuppressWarnings({"unchecked", "resource"})
+  public void go() throws Exception {
+    final List<RowSet> actualResults = new ArrayList<>();
+    CloseableRecordBatch testOperator = null;
+
+    try {
+      validate();
+      int expectedNumBatches = expectedNumBatchesOpt.orElse(expectedResults.size());
+      physicalOpUnitTestBase.mockOpContext(physicalOperator, initReservation, maxAllocation);
+
+      final BatchCreator<PhysicalOperator> opCreator = (BatchCreator<PhysicalOperator>) physicalOpUnitTestBase.opCreatorReg.getOperatorCreator(physicalOperator.getClass());
+      testOperator = opCreator.getBatch(physicalOpUnitTestBase.fragContext, physicalOperator, (List)upstreamBatches);
+
+      batchIterator: for (int batchIndex = 0;; batchIndex++) {
+        final RecordBatch.IterOutcome outcome = testOperator.next();
+
+        switch (outcome) {
+          case NONE:
+            if (!combineOutputBatches) {
+              Assert.assertEquals(expectedNumBatches, batchIndex);
+            }
+            // We are done iterating over batches. Now we need to compare them.
+            break batchIterator;
+          case OK_NEW_SCHEMA:
+            boolean skip = true;
+
+            try {
+              skip = testOperator.getContainer().getRecordCount() == 0;
+            } catch (IllegalStateException e) {
+              // We should skip this batch in this case. It means no data was included with the okay schema
+            } finally {
+              if (skip) {
+                batchIndex--;
+                break;
+              }
+            }
+          case OK:
+            if (!combineOutputBatches && batchIndex >= expectedNumBatches) {
+              testOperator.getContainer().clear();
+              Assert.fail("More batches received than expected.");
+            } else {
+              final boolean hasSelectionVector = testOperator.getSchema().getSelectionVectorMode().hasSelectionVector;
+              final VectorContainer container = testOperator.getContainer();
+
+              if (hasSelectionVector) {
+                throw new UnsupportedOperationException("Implement DRILL-6698");
+              } else {
+                actualResults.add(DirectRowSet.fromContainer(container));
+              }
+              break;
+            }
+          default:
+            throw new UnsupportedOperationException("Can't handle this yet");
+        }
+      }
+
+      int actualTotalRows = actualResults.stream()
+        .mapToInt(RowSet::rowCount)
+        .reduce(Integer::sum)
+        .orElse(0);
+
+      if (expectedResults.isEmpty()) {
+        Assert.assertEquals((int) expectedTotalRowsOpt.orElse(0), actualTotalRows);
+        // We are done, we don't have any expected result to compare
+        return;
+      }
+
+      if (combineOutputBatches) {
+        final RowSet expectedBatch = expectedResults.get(0);
+        final RowSet actualBatch = DirectRowSet.fromSchema(
+          physicalOpUnitTestBase.operatorFixture.allocator, actualResults.get(0).container().getSchema());
+        final VectorContainer actualBatchContainer = actualBatch.container();
+        actualBatchContainer.setRecordCount(0);
+
+        final int numColumns = expectedBatch.schema().size();
+        List<MutableInt> totalBytesPerColumn = new ArrayList<>();
+
+        for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+          totalBytesPerColumn.add(new MutableInt());
+        }
+
+        // Get column sizes for each result batch
+
+        final List<List<RecordBatchSizer.ColumnSize>> columnSizesPerBatch = actualResults.stream().map(rowSet -> {
+          switch (rowSet.indirectionType()) {
+            case NONE:
+              return new RecordBatchSizer(rowSet.container()).columnsList();
+            default:
+              throw new UnsupportedOperationException("Implement DRILL-6698");
+          }
+        }).collect(Collectors.toList());
+
+        // Get total bytes per column
+
+        for (List<RecordBatchSizer.ColumnSize> columnSizes: columnSizesPerBatch) {
+          for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+            final MutableInt totalBytes = totalBytesPerColumn.get(columnIndex);
+            final RecordBatchSizer.ColumnSize columnSize = columnSizes.get(columnIndex);
+            totalBytes.add(columnSize.getTotalDataSize());
+          }
+        }
+
+        for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+          final ValueVector valueVector = actualBatchContainer
+            .getValueVector(columnIndex)
+            .getValueVector();
+
+          if (valueVector instanceof FixedWidthVector) {
+            ((FixedWidthVector) valueVector).allocateNew(actualTotalRows);
+          } else if (valueVector instanceof VariableWidthVector) {
+            final MutableInt totalBytes = totalBytesPerColumn.get(columnIndex);
+            ((VariableWidthVector) valueVector).allocateNew(totalBytes.getValue(), actualTotalRows);
+          } else {
+            throw new UnsupportedOperationException();
+          }
+        }
+
+        try {
+          int currentIndex = 0;
+
+          for (RowSet actualRowSet: actualResults) {
+            final Copier copier;
+            final VectorContainer rowSetContainer = actualRowSet.container();
+            rowSetContainer.setRecordCount(actualRowSet.rowCount());
+
+            switch (actualRowSet.indirectionType()) {
+              case NONE:
+                copier = new GenericCopier();
+                break;
+              default:
+                throw new UnsupportedOperationException("Implement DRILL-6698");
+            }
+
+            copier.setup(rowSetContainer, actualBatchContainer);
+            copier.appendRecords(currentIndex, actualRowSet.rowCount());
+            currentIndex += actualRowSet.rowCount();
+
+            verify(expectedBatch, actualBatch);
+          }
+        } finally {
+          actualBatch.clear();
+        }
+      } else {
+        // Compare expected and actual results
+        for (int batchIndex = 0; batchIndex < expectedNumBatches; batchIndex++) {
+          final RowSet expectedBatch = expectedResults.get(batchIndex);
+          final RowSet actualBatch = actualResults.get(batchIndex);
+
+          verify(expectedBatch, actualBatch);
+        }
+      }
+    } finally {
+      // free resources
+
+      if (testOperator != null) {
+        testOperator.close();
+      }
+
+      actualResults.forEach(rowSet -> rowSet.clear());
+
+      if (expectedResults != null) {
+        expectedResults.forEach(rowSet -> rowSet.clear());
+      }
+
+      upstreamBatches.forEach(rowSetBatch -> {
+        try {
+          rowSetBatch.close();
+        } catch (Exception e) {
+          logger.error("Error while closing RowSetBatch", e);
+        }
+      });
+    }
+  }
+
+  private void verify(final RowSet expectedBatch, final RowSet actualBatch) {
+    if (unordered) {
+      new RowSetComparison(expectedBatch).unorderedVerify(actualBatch);
+    } else {
+      new RowSetComparison(expectedBatch).verify(actualBatch);
+    }
+  }
+
+  /**
+   * Make sure the inputs are valid.
+   */
+  private void validate() {
+    if (combineOutputBatches) {
+      Preconditions.checkArgument(expectedResults.isEmpty() || expectedResults.size() == 1,
+        "The number of expected result batches needs to be zero or one when combining output batches");
+      Preconditions.checkArgument((expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && expectedTotalRowsOpt.isPresent())) ||
+          (!expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && !expectedTotalRowsOpt.isPresent())),
+        "When definig expectedResults, you cannot define expectedNumBatch or expectedTotalRows and vice versa");
+    } else {
+      Preconditions.checkArgument((expectedResults.isEmpty() && (expectedNumBatchesOpt.isPresent() || expectedTotalRowsOpt.isPresent())) ||
+          (!expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && !expectedTotalRowsOpt.isPresent())),
+        "When definig expectedResults, you cannot define expectedNumBatch or expectedTotalRows and vice versa");
+    }
+  }
+
+  public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+    this.physicalOperator = batch;
+    return this;
+  }
+
+  public OperatorTestBuilder initReservation(long initReservation) {
+    this.initReservation = initReservation;
+    return this;
+  }
+
+  public OperatorTestBuilder maxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+    return this;
+  }
+
+  public OperatorTestBuilder expectedNumBatches(int expectedNumBatches) {
+    this.expectedNumBatchesOpt = Optional.of(expectedNumBatches);
+    return this;
+  }
+
+  public OperatorTestBuilder expectedTotalRows(int expectedTotalRows) {
+    this.expectedTotalRowsOpt = Optional.of(expectedTotalRows);
+    return this;
+  }
+
+  /**
+   * Combines all the batches output by the operator into a single batch for comparison.
+   * @return This {@link OperatorTestBuilder}.
+   */
+  public OperatorTestBuilder combineOutputBatches() {
+    combineOutputBatches = true;
+    return this;
+  }
+
+  public OperatorTestBuilder unordered() {
+    unordered = true;
+    return this;
+  }
+
+  public OperatorTestBuilder addUpstreamBatch(final MockRecordBatch mockRecordBatch) {
+    Preconditions.checkNotNull(mockRecordBatch);
+    upstreamBatches.add(mockRecordBatch);
+    return this;
+  }
+
+  public OperatorTestBuilder addExpectedResult(final RowSet rowSet) {
+    Preconditions.checkNotNull(rowSet);
+    expectedResults.add(rowSet);
+    return this;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java
new file mode 100644
index 00000000000..be3cdc2932c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.ComparisonFailure;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class OperatorTestBuilderTest extends PhysicalOpUnitTestBase {
+  public static final String FIRST_NAME_COL = "firstname";
+  public static final String LAST_NAME_COL = "lastname";
+
+  @Test
+  public void noCombineUnorderedTestPass() throws Exception {
+    executeTest(buildInputData(), true, false);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void noCombineUnorderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), true, false);
+  }
+
+  @Test
+  public void noCombineOrderedTestPass() throws Exception {
+    executeTest(buildInputData(), false, false);
+  }
+
+  @Test(expected = ComparisonFailure.class)
+  public void noCombineOrderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), false, false);
+  }
+
+  @Test
+  public void combineUnorderedTestPass() throws Exception {
+    executeTest(buildInputData(), true, true);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void combineUnorderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), true, true);
+  }
+
+  @Test
+  public void combineOrderedTestPass() throws Exception {
+    executeTest(buildInputData(), false, true);
+  }
+
+  @Test(expected = ComparisonFailure.class)
+  public void combineOrderedTestFail() throws Exception {
+    executeTest(buildIncorrectData(), false, true);
+  }
+
+  private Project createProjectPhysicalOperator() {
+    final List<NamedExpression> exprs = Lists.newArrayList(
+      new NamedExpression(SchemaPath.getSimplePath(FIRST_NAME_COL), new FieldReference(FIRST_NAME_COL)),
+      new NamedExpression(SchemaPath.getSimplePath(LAST_NAME_COL), new FieldReference(LAST_NAME_COL)));
+
+    return new Project(exprs, new MockPhysicalOperator(), true);
+  }
+
+  private static TupleMetadata buildSchema() {
+    return new SchemaBuilder()
+      .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+  }
+
+  private RowSet buildInputData() {
+    return new RowSetBuilder(operatorFixture.allocator(), buildSchema()).
+      addRow("billy", "bob").
+      addRow("bobby", "fillet").
+      build();
+  }
+
+  private RowSet buildIncorrectData() {
+    return new RowSetBuilder(operatorFixture.allocator(), buildSchema()).
+      addRow("billy", "bob").
+      addRow("bambam", "fofof").
+      build();
+  }
+
+  private void executeTest(final RowSet expectedRowSet, boolean unordered, boolean combine) throws Exception {
+    final MockRecordBatch inputRowSetBatch = new MockRecordBatch.Builder().
+      sendData(buildInputData()).
+      build(fragContext);
+
+    final OperatorTestBuilder testBuilder = opTestBuilder()
+      .physicalOperator(createProjectPhysicalOperator());
+
+    if (combine) {
+      testBuilder.combineOutputBatches();
+    }
+
+    if (unordered) {
+      testBuilder.unordered();
+    }
+
+    testBuilder
+      .addUpstreamBatch(inputRowSetBatch)
+      .addExpectedResult(expectedRowSet)
+      .go();
+  }
+
+  public static class MockPhysicalOperator extends AbstractBase {
+    @Override
+    public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+      return null;
+    }
+
+    @Override
+    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+      return null;
+    }
+
+    @Override
+    public int getOperatorType() {
+      return 0;
+    }
+
+    @Override
+    public Iterator<PhysicalOperator> iterator() {
+      return null;
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
similarity index 71%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
rename to exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index b2412e5069e..1c4779c3e7f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -15,12 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.unit;
+package org.apache.drill.test;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -40,9 +38,7 @@
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-import org.apache.drill.test.DrillTestWrapper;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -55,27 +51,22 @@
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
-import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.test.OperatorFixture;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -83,7 +74,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.TreeMap;
 
 public class PhysicalOpUnitTestBase extends ExecTest {
   protected MockExecutorFragmentContext fragContext;
@@ -96,9 +86,9 @@
   @Rule
   public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
 
-  private final DrillConfig drillConf = DrillConfig.create();
-  private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
-  private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
+  protected final DrillConfig drillConf = DrillConfig.create();
+  protected final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
+  protected final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
 
   @Before
   public void setup() throws Exception {
@@ -186,141 +176,22 @@ public void remove() {
     }
   }
 
-  protected OperatorTestBuilder opTestBuilder() {
-    return new OperatorTestBuilder();
+  /**
+   * Gets a {@link LegacyOperatorTestBuilder}.
+   * @deprecated Use {@link #opTestBuilder()} instead.
+   * @return A {@link LegacyOperatorTestBuilder}.
+   */
+  @Deprecated
+  protected LegacyOperatorTestBuilder legacyOpTestBuilder() {
+    return new LegacyOperatorTestBuilder(this);
   }
 
-  protected class OperatorTestBuilder {
-
-    private PhysicalOperator popConfig;
-    private String[] baselineColumns;
-    private List<Map<String, Object>> baselineRecords;
-    private List<List<String>> inputStreamsJSON;
-    private long initReservation = AbstractBase.INIT_ALLOCATION;
-    private long maxAllocation = AbstractBase.MAX_ALLOCATION;
-    private boolean checkBatchMemory;
-    private boolean expectNoRows;
-    private Long expectedBatchSize;
-    private Integer expectedNumBatches;
-    private Integer expectedTotalRows;
-
-    @SuppressWarnings({"unchecked", "resource"})
-    public void go() {
-      BatchCreator<PhysicalOperator> opCreator;
-      RecordBatch testOperator;
-      try {
-        mockOpContext(popConfig, initReservation, maxAllocation);
-
-        opCreator = (BatchCreator<PhysicalOperator>) opCreatorReg.getOperatorCreator(popConfig.getClass());
-        List<RecordBatch> incomingStreams = Lists.newArrayList();
-        if (inputStreamsJSON != null) {
-          for (List<String> batchesJson : inputStreamsJSON) {
-            incomingStreams.add(new ScanBatch(popConfig, fragContext,
-                getReaderListForJsonBatches(batchesJson, fragContext)));
-          }
-        }
-
-        testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
-
-        Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows);
-        if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results
-
-        Map<String, List<Object>> expectedSuperVectors;
-
-        if (expectNoRows) {
-          expectedSuperVectors = new TreeMap<>();
-          for (String column : baselineColumns) {
-            expectedSuperVectors.put(column, new ArrayList<>());
-          }
-        } else {
-          expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
-        }
-
-        DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
-
-      } catch (ExecutionSetupException e) {
-        throw new RuntimeException(e);
-      } catch (UnsupportedEncodingException e) {
-        throw new RuntimeException(e);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
-      this.popConfig = batch;
-      return this;
-    }
-
-    public OperatorTestBuilder initReservation(long initReservation) {
-      this.initReservation = initReservation;
-      return this;
-    }
-
-    public OperatorTestBuilder maxAllocation(long maxAllocation) {
-      this.maxAllocation = maxAllocation;
-      return this;
-    }
-
-    public OperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
-      this.inputStreamsJSON = new ArrayList<>();
-      this.inputStreamsJSON.add(jsonBatches);
-      return this;
-    }
-
-    public OperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
-      this.inputStreamsJSON = childStreams;
-      return this;
-    }
-
-    public OperatorTestBuilder baselineColumns(String... columns) {
-      for (int i = 0; i < columns.length; i++) {
-        LogicalExpression ex = parseExpr(columns[i]);
-        if (ex instanceof SchemaPath) {
-          columns[i] = ((SchemaPath)ex).toExpr();
-        } else {
-          throw new IllegalStateException("Schema path is not a valid format.");
-        }
-      }
-      this.baselineColumns = columns;
-      return this;
-    }
-
-    public OperatorTestBuilder baselineValues(Object... baselineValues) {
-      if (baselineRecords == null) {
-        baselineRecords = new ArrayList<>();
-      }
-      Map<String, Object> ret = new HashMap<>();
-      int i = 0;
-      Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
-          "Must supply the same number of baseline values as columns.");
-      for (String s : baselineColumns) {
-        ret.put(s, baselineValues[i]);
-        i++;
-      }
-      this.baselineRecords.add(ret);
-      return this;
-    }
-
-    public OperatorTestBuilder expectZeroRows() {
-      this.expectNoRows = true;
-      return this;
-    }
-
-    public OperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
-      this.expectedNumBatches = expectedNumBatches;
-      return this;
-    }
-
-    public OperatorTestBuilder expectedBatchSize(Long batchSize) {
-      this.expectedBatchSize = batchSize;
-      return this;
-    }
-
-    public OperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
-      this.expectedTotalRows = expectedTotalRows;
-      return this;
-    }
+  /**
+   * Gets an {@link OperatorTestBuilder}.
+   * @return An {@link OperatorTestBuilder}.
+   */
+  protected OperatorTestBuilder opTestBuilder() {
+    return new OperatorTestBuilder(this);
   }
 
   /**
@@ -446,11 +317,11 @@ public RuntimeFilterWritable getRuntimeFilter() {
   /**
    * <h2>Note</h2>
    * <p>
-   *   The {@link MockPhysicalOperator} should only be used in {@link PhysicalOpUnitTestBase} because {@link PhysicalOpUnitTestBase}
+   *   The {@link MockPhysicalOperator} should only be used in {@link PhysicalOpUnitTestBase} and its ancestors because {@link PhysicalOpUnitTestBase}
    *   needs a dummy {@link MockPhysicalOperator} to be passed to Scanners.
    * </p>
    */
-  protected static class MockPhysicalOperator extends AbstractBase
+  public static class MockPhysicalOperator extends AbstractBase
   {
     @Override
     public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
deleted file mode 100644
index a3cd918a9d0..00000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.test.rowSet;
-
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-
-import java.util.Iterator;
-
-public class RowSetBatch implements RecordBatch {
-  private final RowSet rowSet;
-
-  public RowSetBatch(final RowSet rowSet) {
-    this.rowSet = Preconditions.checkNotNull(rowSet);
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    return rowSet.batchSchema();
-  }
-
-  @Override
-  public int getRecordCount() {
-    return rowSet.container().getRecordCount();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    if (rowSet instanceof IndirectRowSet) {
-      return ((IndirectRowSet)rowSet).getSv2();
-    }
-
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    if (rowSet instanceof RowSet.HyperRowSet) {
-      return ((RowSet.HyperRowSet)rowSet).getSv4();
-    }
-
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void kill(boolean sendUpstream) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public VectorContainer getOutgoingContainer() {
-    return rowSet.container();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return rowSet.container().getValueVectorId(path);
-  }
-
-  @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
-    return rowSet.container().getValueAccessorById(clazz, ids);
-  }
-
-  @Override
-  public IterOutcome next() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public WritableBatch getWritableBatch() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Iterator<VectorWrapper<?>> iterator() {
-    return rowSet.container().iterator();
-  }
-
-  @Override
-  public VectorContainer getContainer() { return rowSet.container(); }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
index e098e333853..b1cb058e893 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -20,13 +20,22 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 import org.bouncycastle.util.Arrays;
+import org.junit.Assert;
 
 /**
  * For testing, compare the contents of two row sets (record batches)
@@ -58,10 +67,10 @@
    */
   private boolean mask[];
   /**
-   * Floats and doubles do not compare exactly. This delta is used
-   * by JUnit for such comparisons.
+   * Floats and doubles do not compare exactly. This MathContext is used
+   * to construct BigDecimals of the desired precision.
    */
-  private double delta = 0.001;
+  private MathContext scale = new MathContext(3);
   /**
    * Tests can skip the first n rows.
    */
@@ -106,14 +115,15 @@ public RowSetComparison withMask(Boolean...flags) {
   }
 
   /**
-   * Specify the delta value to use when comparing float or
+   * Specify the precision to use when comparing float or
    * double values.
    *
-   * @param delta the delta to use in float and double comparisons
+   * @param scale the precision to use for comparing floats and doubles. See {@link BigDecimal#scale()} for
+   *              a definition scale.
    * @return this builder
    */
-  public RowSetComparison withDelta(double delta) {
-    this.delta = delta;
+  public RowSetComparison withScale(int scale) {
+    this.scale = new MathContext(scale);
     return this;
   }
 
@@ -142,28 +152,93 @@ public RowSetComparison span(int span) {
     return this;
   }
 
+  private void compareSchemasAndCounts(RowSet actual) {
+    assertTrue("Schemas don't match.\n" +
+      "Expected: " + expected.schema().toString() +
+      "\nActual: " + actual.schema(), expected.schema().isEquivalent(actual.schema()));
+    int testLength = getTestLength();
+    int dataLength = offset + testLength;
+    assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
+    assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+  }
+
+  private int getTestLength() {
+    return span > -1 ? span : expected.rowCount() - offset;
+  }
+
+  public void unorderedVerify(RowSet actual) {
+    compareSchemasAndCounts(actual);
+
+    int testLength = getTestLength();
+    RowSetReader er = expected.reader();
+    RowSetReader ar = actual.reader();
+
+    for (int i = 0; i < offset; i++) {
+      er.next();
+      ar.next();
+    }
+
+    final Multiset<List<Object>> expectedSet = HashMultiset.create();
+    final Multiset<List<Object>> actualSet = HashMultiset.create();
+
+    for (int rowCounter = 0; rowCounter < testLength; rowCounter++) {
+      er.next();
+      ar.next();
+
+      expectedSet.add(buildRow(er));
+      actualSet.add(buildRow(ar));
+    }
+
+    Assert.assertEquals(expectedSet, actualSet);
+  }
+
+  /**
+   * Convenience method to verify the actual results, then free memory
+   * for both the expected and actual result sets.
+   * @param actual the actual results to verify
+   */
+  public void unorderedVerifyAndClearAll(RowSet actual) {
+    try {
+      unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  private List<Object> buildRow(RowSetReader reader) {
+    final List<Object> row = new ArrayList<>();
+
+    for (int i = 0; i < mask.length; i++) {
+      if (!mask[i]) {
+        continue;
+      }
+
+      final ScalarReader scalarReader = reader.column(i).scalar();
+      final Object value = getScalar(scalarReader);
+      row.add(value);
+    }
+
+    return row;
+  }
+
   /**
    * Verify the actual rows using the rules defined in this builder
    * @param actual the actual results to verify
    */
 
   public void verify(RowSet actual) {
-    assertTrue("Schemas don't match.\n" +
-        "Expected: " + expected.schema().toString() +
-        "\nActual: " + actual.schema(), expected.schema().isEquivalent(actual.schema()));
-    int testLength = expected.rowCount() - offset;
-    if (span > -1) {
-      testLength = span;
-    }
-    int dataLength = offset + testLength;
-    assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
-    assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+    compareSchemasAndCounts(actual);
+    int testLength = getTestLength();
+
     RowSetReader er = expected.reader();
     RowSetReader ar = actual.reader();
+
     for (int i = 0; i < offset; i++) {
       er.next();
       ar.next();
     }
+
     for (int i = 0; i < testLength; i++) {
       er.next();
       ar.next();
@@ -245,34 +320,32 @@ private void verifyScalar(String label, ScalarReader ec, ScalarReader ac) {
     if (! ec.isNull()) {
       assertTrue(label + " - column is null", ! ac.isNull());
     }
+
     switch (ec.valueType()) {
-    case BYTES: {
+      case BYTES:
         byte expected[] = ac.getBytes();
         byte actual[] = ac.getBytes();
         assertEquals(label + " - byte lengths differ", expected.length, actual.length);
         assertTrue(label, Arrays.areEqual(expected, actual));
         break;
-     }
-     case DOUBLE:
-       assertEquals(label, ec.getDouble(), ac.getDouble(), delta);
-       break;
-     case INTEGER:
-       assertEquals(label, ec.getInt(), ac.getInt());
-       break;
-     case LONG:
-       assertEquals(label, ec.getLong(), ac.getLong());
-       break;
-     case STRING:
-       assertEquals(label, ec.getString(), ac.getString());
-        break;
-     case DECIMAL:
-       assertEquals(label, ec.getDecimal(), ac.getDecimal());
-       break;
-     case PERIOD:
-       assertEquals(label, ec.getPeriod(), ac.getPeriod());
-       break;
-     default:
-        throw new IllegalStateException( "Unexpected type: " + ec.valueType());
+      default:
+        assertEquals(label, getScalar(ec), getScalar(ac));
+    }
+  }
+
+  private Object getScalar(final ScalarReader scalarReader) {
+    if (scalarReader.isNull()) {
+      return Optional.absent();
+    }
+
+    switch (scalarReader.valueType()) {
+      case BYTES:
+        return ByteBuffer.wrap(scalarReader.getBytes());
+      case DOUBLE: {
+        return new BigDecimal(scalarReader.getDouble(), this.scale).stripTrailingZeros();
+      }
+      default:
+        return scalarReader.getObject();
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java
new file mode 100644
index 00000000000..b8bdb12b809
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRowSetComparison {
+  private BufferAllocator allocator;
+
+  @Before
+  public void setup() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @Test
+  public void simpleUnorderedComparisonMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(1, 1)
+      .addRow(1, 2)
+      .addRow(2, 1)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(1, 2)
+      .addRow(2, 1)
+      .addRow(1, 1)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test
+  public void simpleDoubleUnorderedComparisonMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.01)
+      .addRow(1.01f, 1.0)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1.004f, .9996)
+      .addRow(1.0f, 1.008)
+      .addRow(1.008f, 1.0)
+      .addRow(.9996f, 1.004)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test
+  public void simpleVarcharUnorderedComparisonMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow("aaa")
+      .addRow("bbb")
+      .addRow("ccc")
+      .addRow("bbb")
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow("ccc")
+      .addRow("aaa")
+      .addRow("bbb")
+      .addRow("bbb")
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test(expected = AssertionError.class)
+  public void simpleUnorderedComparisonNoMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(3, 2)
+      .addRow(2, 4)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1, 1)
+      .addRow(2, 1)
+      .addRow(1, 1)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test(expected = AssertionError.class)
+  public void simpleDoubleUnorderedComparisonNoMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REQUIRED)
+      .add("b", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.0)
+      .addRow(1.0f, 1.01)
+      .addRow(1.01f, 1.0)
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow(1.009f, .9996)
+      .addRow(1.0f, 1.004)
+      .addRow(1.008f, 1.0)
+      .addRow(.9994f, 1.004)
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @Test(expected = AssertionError.class)
+  public void simpleVarcharUnorderedComparisonNoMatchTest() {
+    final TupleMetadata schema = new SchemaBuilder()
+      .add("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    final RowSet expected = new RowSetBuilder(allocator, schema)
+      .addRow("red")
+      .addRow("bbb")
+      .addRow("ccc")
+      .addRow("bbb")
+      .build();
+
+    final RowSet actual = new RowSetBuilder(allocator, schema)
+      .addRow("ccc")
+      .addRow("aaa")
+      .addRow("blue")
+      .addRow("bbb")
+      .build();
+
+    try {
+      new RowSetComparison(expected).unorderedVerify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  @After
+  public void teardown() {
+    allocator.close();
+  }
+}
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index eb413b045b8..a98aa66b6c7 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -358,7 +358,7 @@ public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from
 
   @Override
   public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
-    ((${minor.class}Vector) from).data.getBytes(fromIndex * VALUE_WIDTH, data, toIndex * VALUE_WIDTH, VALUE_WIDTH);
+    copyFromSafe(fromIndex, toIndex, (${minor.class}Vector) from);
   }
 
   public void decrementAllocationMonitor() {
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index ff066fbc969..f82c718966c 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -433,7 +433,7 @@ public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
 
     // Handle the case of not-nullable copied into a nullable
     if (from instanceof ${minor.class}Vector) {
-      bits.getMutator().set(toIndex,1);
+      bits.getMutator().setSafe(toIndex,1);
       values.copyFromSafe(fromIndex,toIndex,(${minor.class}Vector)from);
       return;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services