You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by js...@apache.org on 2016/04/20 18:08:56 UTC

[4/4] drill git commit: DRILL-4437: Operator unit test framework

DRILL-4437: Operator unit test framework

Closes #394


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d93a3633
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d93a3633
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d93a3633

Branch: refs/heads/master
Commit: d93a3633815ed1c7efd6660eae62b7351a2c9739
Parents: 01e04cd
Author: Jason Altekruse <al...@gmail.com>
Authored: Fri Feb 26 14:55:30 2016 -0800
Committer: Jason Altekruse <al...@gmail.com>
Committed: Wed Apr 20 09:07:13 2016 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   6 +-
 .../drill/exec/compile/ClassTransformer.java    |   4 +-
 .../drill/exec/physical/impl/ImplCreator.java   |   4 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |   2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |   4 +
 .../exec/store/easy/json/JSONRecordReader.java  |   4 +-
 .../java/org/apache/drill/DrillTestWrapper.java | 215 ++++++++----
 .../test/java/org/apache/drill/TestBuilder.java |   4 -
 .../physical/unit/BasicPhysicalOpUnitTest.java  | 322 +++++++++++++++++
 .../physical/unit/PhysicalOpUnitTestBase.java   | 341 +++++++++++++++++++
 .../apache/drill/jdbc/test/JdbcDataTest.java    |   6 +-
 11 files changed, 834 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index a490116..6a0889d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -149,7 +149,7 @@ public interface ExecConstants {
   OptionValidator FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir");
 
   String JSON_READ_NUMBERS_AS_DOUBLE = "store.json.read_numbers_as_double";
-  OptionValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false);
+  BooleanValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false);
 
   String MONGO_ALL_TEXT_MODE = "store.mongo.all_text_mode";
   OptionValidator MONGO_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(MONGO_ALL_TEXT_MODE, false);
@@ -178,9 +178,9 @@ public interface ExecConstants {
    * HashTable runtime settings
    */
   String MIN_HASH_TABLE_SIZE_KEY = "exec.min_hash_table_size";
-  OptionValidator MIN_HASH_TABLE_SIZE = new PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.DEFAULT_INITIAL_CAPACITY);
+  PositiveLongValidator MIN_HASH_TABLE_SIZE = new PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.DEFAULT_INITIAL_CAPACITY);
   String MAX_HASH_TABLE_SIZE_KEY = "exec.max_hash_table_size";
-  OptionValidator MAX_HASH_TABLE_SIZE = new PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.MAXIMUM_CAPACITY);
+  PositiveLongValidator MAX_HASH_TABLE_SIZE = new PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.MAXIMUM_CAPACITY);
 
   /**
    * Limits the maximum level of parallelization to this factor time the number of Drillbits

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 3c93599..02323a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -216,9 +216,7 @@ public class ClassTransformer {
       final String entireClass,
       final String materializedClassName) throws ClassTransformationException {
     // unfortunately, this hasn't been set up at construction time, so we have to do it here
-    final OptionValue optionValue = optionManager.getOption(SCALAR_REPLACEMENT_OPTION);
-    final ScalarReplacementOption scalarReplacementOption =
-        ScalarReplacementOption.fromString((String) optionValue.getValue()); // TODO(DRILL-2474)
+    final ScalarReplacementOption scalarReplacementOption = ScalarReplacementOption.fromString(optionManager.getOption(SCALAR_REPLACEMENT_VALIDATOR));
 
     try {
       final long t1 = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 8a8a1ae..5872ef1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -120,7 +121,8 @@ public class ImplCreator {
 
 
   /** Create a RecordBatch and its children for given PhysicalOperator */
-  private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+  @VisibleForTesting
+  public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
     Preconditions.checkNotNull(op);
 
     final List<RecordBatch> childRecordBatches = getChildren(op, context);

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2ba54dd..2ace69e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -308,7 +308,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
 
     final HashTableConfig htConfig =
-        new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
+        new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
             HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
     // Create the chained hash table

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 7797339..0ee518e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -374,6 +374,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
                 (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
                 // If we haven't spilled so far, make sure we don't exceed the maximum number of batches SV4 can address
                 (spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
+                // TODO(DRILL-4438) - consider setting this threshold more intelligently,
+                // lowering caused a failing low memory condition (test in BasicPhysicalOpUnitTest)
+                // to complete successfully (although it caused perf decrease as there was more spilling)
+
                 // current memory used is more than 95% of memory usage limit of this operator
                 (oAllocator.getAllocatedMemory() > .95 * oAllocator.getLimit()) ||
                 // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index e943401..dbbe6b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -112,8 +112,8 @@ public class JSONRecordReader extends AbstractRecordReader {
 
     // only enable all text mode if we aren't using embedded content mode.
     this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
-    this.readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val;
-    this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
+    this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
     setColumns(columns);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index f853414..2a9c03d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -24,8 +24,10 @@ import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,7 +46,10 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.HyperVectorWrapper;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.util.Text;
 import org.apache.drill.exec.vector.ValueVector;
@@ -147,19 +152,19 @@ public class DrillTestWrapper {
         i++;
       }
     }
-    for (HyperVectorValueIterator hvi : expectedRecords.values()) {
-      for (ValueVector vv : hvi.getHyperVector().getValueVectors()) {
-        vv.clear();
-      }
-    }
-    for (HyperVectorValueIterator hvi : actualRecords.values()) {
+    cleanupHyperValueIterators(expectedRecords.values());
+    cleanupHyperValueIterators(actualRecords.values());
+  }
+
+  private void cleanupHyperValueIterators(Collection<HyperVectorValueIterator> hyperBatches) {
+    for (HyperVectorValueIterator hvi : hyperBatches) {
       for (ValueVector vv : hvi.getHyperVector().getValueVectors()) {
         vv.clear();
       }
     }
   }
 
-  private void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception {
+  public static void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception {
     for (String s : actualRecords.keySet()) {
       assertNotNull("Unexpected extra column " + s + " returned by query.", expectedRecords.get(s));
       assertEquals("Incorrect number of rows returned by query.", expectedRecords.get(s).size(), actualRecords.get(s).size());
@@ -180,7 +185,7 @@ public class DrillTestWrapper {
     }
   }
 
-  private String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) {
+  private static String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) {
     StringBuilder expected = new StringBuilder();
     StringBuilder actual = new StringBuilder();
     expected.append("Expected Records near verification failure:\n");
@@ -208,8 +213,9 @@ public class DrillTestWrapper {
 
   }
 
-  private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryDataBatch> records, RecordBatchLoader loader,
-                                                                      BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+  private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records,
+                                                                    final RecordBatchLoader loader)
+      throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>();
 
@@ -218,7 +224,6 @@ public class DrillTestWrapper {
     int size = records.size();
     for (int i = 0; i < size; i++) {
       batch = records.get(i);
-      loader = new RecordBatchLoader(getAllocator());
       loader.load(batch.getHeader().getDef(), batch.getData());
       logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
       totalRecords += loader.getRecordCount();
@@ -241,30 +246,70 @@ public class DrillTestWrapper {
     return combinedVectors;
   }
 
+  private static class BatchIterator implements Iterable<VectorAccessible>, AutoCloseable {
+    private final List<QueryDataBatch> dataBatches;
+    private final RecordBatchLoader batchLoader;
+
+    public BatchIterator(List<QueryDataBatch> dataBatches, RecordBatchLoader batchLoader) {
+      this.dataBatches = dataBatches;
+      this.batchLoader = batchLoader;
+    }
+
+    @Override
+    public Iterator<VectorAccessible> iterator() {
+      return new Iterator<VectorAccessible>() {
+
+        int index = -1;
+
+        @Override
+        public boolean hasNext() {
+          return index < dataBatches.size() - 1;
+        }
+
+        @Override
+        public VectorAccessible next() {
+          index++;
+          if (index == dataBatches.size()) {
+            throw new RuntimeException("Tried to call next when iterator had no more items.");
+          }
+          batchLoader.clear();
+          QueryDataBatch batch = dataBatches.get(index);
+          try {
+            batchLoader.load(batch.getHeader().getDef(), batch.getData());
+          } catch (SchemaChangeException e) {
+            throw new RuntimeException(e);
+          }
+          return batchLoader;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException("Removing is not supported");
+        }
+      };
+    }
+
+    @Override
+    public void close() throws Exception {
+      batchLoader.clear();
+    }
+
+  }
+
   /**
-   * Only use this method if absolutely needed. There are utility methods to compare results of single queries.
-   * The current use case for exposing this is setting session or system options between the test and verification
-   * queries.
-   *
-   * TODO - evaluate adding an interface to allow setting session and system options before running queries
-   * @param records
-   * @param loader
-   * @param schema
+   * @param batches
    * @return
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
-   private Map<String, List<Object>> addToCombinedVectorResults(List<QueryDataBatch> records, RecordBatchLoader loader,
-                                                         BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+  public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
+       throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     Map<String, List<Object>> combinedVectors = new TreeMap<>();
 
     long totalRecords = 0;
-    QueryDataBatch batch;
-    int size = records.size();
-    for (int i = 0; i < size; i++) {
-      batch = records.get(0);
-      loader.load(batch.getHeader().getDef(), batch.getData());
+    BatchSchema schema = null;
+    for (VectorAccessible loader : batches)  {
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
       // SchemaChangeException, so check/clean throws clause above.
       if (schema == null) {
@@ -272,24 +317,66 @@ public class DrillTestWrapper {
         for (MaterializedField mf : schema) {
           combinedVectors.put(SchemaPath.getSimplePath(mf.getPath()).toExpr(), new ArrayList<Object>());
         }
+      } else {
+        // TODO - actually handle schema changes, this is just to get access to the SelectionVectorMode
+        // of the current batch, the check for a null schema is used to only mutate the schema once
+        // need to add new vectors and null fill for previous batches? distinction between null and non-existence important?
+        schema = loader.getSchema();
       }
       logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
       totalRecords += loader.getRecordCount();
       for (VectorWrapper<?> w : loader) {
         String field = SchemaPath.getSimplePath(w.getField().getPath()).toExpr();
-        for (int j = 0; j < loader.getRecordCount(); j++) {
-          Object obj = w.getValueVector().getAccessor().getObject(j);
-          if (obj != null) {
-            if (obj instanceof Text) {
-              obj = obj.toString();
+        ValueVector[] vectors;
+        if (w.isHyper()) {
+          vectors = w.getValueVectors();
+        } else {
+          vectors = new ValueVector[] {w.getValueVector()};
+        }
+        SelectionVector2 sv2 = null;
+        SelectionVector4 sv4 = null;
+        switch(schema.getSelectionVectorMode()) {
+          case TWO_BYTE:
+            sv2 = loader.getSelectionVector2();
+            break;
+          case FOUR_BYTE:
+            sv4 = loader.getSelectionVector4();
+            break;
+        }
+        if (sv4 != null) {
+          for (int j = 0; j < sv4.getCount(); j++) {
+            int complexIndex = sv4.get(j);
+            int batchIndex = complexIndex >> 16;
+            int recordIndexInBatch = complexIndex & 65535;
+            Object obj = vectors[batchIndex].getAccessor().getObject(recordIndexInBatch);
+            if (obj != null) {
+              if (obj instanceof Text) {
+                obj = obj.toString();
+              }
+            }
+            combinedVectors.get(field).add(obj);
+          }
+        }
+        else {
+          for (ValueVector vv : vectors) {
+            for (int j = 0; j < loader.getRecordCount(); j++) {
+              int index;
+              if (sv2 != null) {
+                index = sv2.getIndex(j);
+              } else {
+                index = j;
+              }
+              Object obj = vv.getAccessor().getObject(index);
+              if (obj != null) {
+                if (obj instanceof Text) {
+                  obj = obj.toString();
+                }
+              }
+              combinedVectors.get(field).add(obj);
             }
           }
-          combinedVectors.get(field).add(obj);
         }
       }
-      records.remove(0);
-      batch.release();
-      loader.clear();
     }
     return combinedVectors;
   }
@@ -342,7 +429,6 @@ public class DrillTestWrapper {
    */
   protected void compareUnorderedResults() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    BatchSchema schema = null;
 
     List<QueryDataBatch> actual = Collections.emptyList();
     List<QueryDataBatch> expected = Collections.emptyList();
@@ -356,14 +442,14 @@ public class DrillTestWrapper {
       checkNumBatches(actual);
 
       addTypeInfoIfMissing(actual.get(0), testBuilder);
-      addToMaterializedResults(actualRecords, actual, loader, schema);
+      addToMaterializedResults(actualRecords, actual, loader);
 
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
         BaseTestQuery.test(baselineOptionSettingQueries);
         expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
-        addToMaterializedResults(expectedRecords, expected, loader, schema);
+        addToMaterializedResults(expectedRecords, expected, loader);
       } else {
         expectedRecords = baselineRecords;
       }
@@ -409,28 +495,24 @@ public class DrillTestWrapper {
       // To avoid extra work for test writers, types can optionally be inferred from the test query
       addTypeInfoIfMissing(actual.get(0), testBuilder);
 
-      actualSuperVectors = addToCombinedVectorResults(actual, loader, schema);
+      BatchIterator batchIter = new BatchIterator(actual, loader);
+      actualSuperVectors = addToCombinedVectorResults(batchIter);
+      batchIter.close();
 
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
         BaseTestQuery.test(baselineOptionSettingQueries);
         expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
-        expectedSuperVectors = addToCombinedVectorResults(expected, loader, schema);
+        BatchIterator exBatchIter = new BatchIterator(expected, loader);
+        expectedSuperVectors = addToCombinedVectorResults(exBatchIter);
+        exBatchIter.close();
       } else {
         // data is built in the TestBuilder in a row major format as it is provided by the user
         // translate it here to vectorized, the representation expected by the ordered comparison
-        expectedSuperVectors = new TreeMap<>();
-        expected = new ArrayList<>();
-        for (String s : baselineRecords.get(0).keySet()) {
-          expectedSuperVectors.put(s, new ArrayList<>());
-        }
-        for (Map<String, Object> m : baselineRecords) {
-          for (String s : m.keySet()) {
-            expectedSuperVectors.get(s).add(m.get(s));
-          }
-        }
+        expectedSuperVectors = translateRecordListToHeapVectors(baselineRecords);
       }
+
       compareMergedVectors(expectedSuperVectors, actualSuperVectors);
     } catch (Exception e) {
       throw new Exception(e.getMessage() + "\nFor query: " + query , e);
@@ -439,9 +521,21 @@ public class DrillTestWrapper {
     }
   }
 
+  public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) {
+    Map<String, List<Object>> ret = new TreeMap<>();
+    for (String s : records.get(0).keySet()) {
+      ret.put(s, new ArrayList<>());
+    }
+    for (Map<String, Object> m : records) {
+      for (String s : m.keySet()) {
+        ret.get(s).add(m.get(s));
+      }
+    }
+    return ret;
+  }
+
   public void compareResultsHyperVector() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    BatchSchema schema = null;
 
     BaseTestQuery.test(testOptionSettingQueries);
     List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
@@ -451,12 +545,12 @@ public class DrillTestWrapper {
     // To avoid extra work for test writers, types can optionally be inferred from the test query
     addTypeInfoIfMissing(results.get(0), testBuilder);
 
-    Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader, schema);
+    Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader);
 
     BaseTestQuery.test(baselineOptionSettingQueries);
     List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
 
-    Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader, schema);
+    Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader);
 
     compareHyperVectors(expectedSuperVectors, actualSuperVectors);
     cleanupBatches(results, expected);
@@ -496,8 +590,10 @@ public class DrillTestWrapper {
     }
   }
 
-  protected void addToMaterializedResults(List<Map<String, Object>> materializedRecords, List<QueryDataBatch> records, RecordBatchLoader loader,
-                                          BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+  public static void addToMaterializedResults(List<Map<String, Object>> materializedRecords,
+                                          List<QueryDataBatch> records,
+                                          RecordBatchLoader loader)
+      throws SchemaChangeException, UnsupportedEncodingException {
     long totalRecords = 0;
     QueryDataBatch batch;
     int size = records.size();
@@ -506,9 +602,6 @@ public class DrillTestWrapper {
       loader.load(batch.getHeader().getDef(), batch.getData());
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
       // SchemaChangeException, so check/clean throws clause above.
-      if (schema == null) {
-        schema = loader.getSchema();
-      }
       logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
       totalRecords += loader.getRecordCount();
       for (int j = 0; j < loader.getRecordCount(); j++) {
@@ -531,7 +624,7 @@ public class DrillTestWrapper {
     }
   }
 
-  public boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception {
+  public static boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception {
 
     if (compareValues(expected, actual, counter, column)) {
       return true;
@@ -554,7 +647,7 @@ public class DrillTestWrapper {
     return true;
   }
 
-  public boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception {
+  public static boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception {
     if (expected == null) {
       if (actual == null) {
         if (VERBOSE_DEBUG) {
@@ -648,7 +741,7 @@ public class DrillTestWrapper {
     assertEquals(0, actualRecords.size());
   }
 
-  private String findMissingColumns(Set<String> expected, Set<String> actual) {
+  private static String findMissingColumns(Set<String> expected, Set<String> actual) {
     String missingCols = "";
     for (String colName : expected) {
       if (!actual.contains(colName)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index 8702eb5..b073371 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -194,16 +194,12 @@ public class TestBuilder {
   // modified code from SchemaPath.De class. This should be used sparingly and only in tests if absolutely needed.
   public static SchemaPath parsePath(String path) {
     try {
-      // logger.debug("Parsing expression string '{}'", expr);
       ExprLexer lexer = new ExprLexer(new ANTLRStringStream(path));
       CommonTokenStream tokens = new CommonTokenStream(lexer);
       ExprParser parser = new ExprParser(tokens);
 
-      //TODO: move functionregistry and error collector to injectables.
-      //ctxt.findInjectableValue(valueId, forProperty, beanInstance)
       ExprParser.parse_return ret = parser.parse();
 
-      // ret.e.resolveAndValidate(expr, errorCollector);
       if (ret.e instanceof SchemaPath) {
         return (SchemaPath) ret.e;
       } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
new file mode 100644
index 0000000..6f2f160
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.physical.config.ComplexToJson;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.MergingReceiverPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.TopN;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.drill.TestBuilder.mapOf;
+
+public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
+
+  @Test
+  public void testSimpleProject() {
+    Project projectConf = new Project(parseExprs("x+5", "x"), null);
+    List<String> jsonBatches = Lists.newArrayList(
+        "[{\"x\": 5 },{\"x\": 10 }]",
+        "[{\"x\": 20 },{\"x\": 30 },{\"x\": 40 }]");
+    opTestBuilder()
+        .physicalOperator(projectConf)
+        .inputDataStreamJson(jsonBatches)
+        .baselineColumns("x")
+        .baselineValues(10l)
+        .baselineValues(15l)
+        .baselineValues(25l)
+        .baselineValues(35l)
+        .baselineValues(45l)
+        .go();
+  }
+
+  @Test
+  public void testProjectComplexOutput() {
+    Project projectConf = new Project(parseExprs("convert_from(json_col, 'JSON')", "complex_col"), null);
+    List<String> jsonBatches = Lists.newArrayList(
+        "[{\"json_col\": \"{ \\\"a\\\" : 1 }\"}]",
+        "[{\"json_col\": \"{ \\\"a\\\" : 5 }\"}]");
+    opTestBuilder()
+        .physicalOperator(projectConf)
+        .inputDataStreamJson(jsonBatches)
+        .baselineColumns("complex_col")
+        .baselineValues(mapOf("a", 1l))
+        .baselineValues(mapOf("a", 5l))
+        .go();
+  }
+
+  @Test
+  public void testSimpleHashJoin() {
+    HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT);
+    // TODO - figure out where to add validation, column names must be unique, even between the two batches,
+    // for all columns, not just the one in the join condition
+    // TODO - if any are common between the two, it is failing in the generated setup method in HashJoinProbeGen
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"x\": 5, \"a\" : \"a string\"}]",
+        "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
+        "[{\"x1\": 6, \"a2\" : \"qwerty\"},{\"x1\": 5, \"a2\" : \"12345\"}]");
+    opTestBuilder()
+        .physicalOperator(joinConf)
+        .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
+        .baselineColumns("x", "a", "a2", "x1")
+        .baselineValues(5l, "a string", "asdf", 5l)
+        .baselineValues(5l, "a string", "12345", 5l)
+        .baselineValues(5l, "a different string", "asdf", 5l)
+        .baselineValues(5l, "a different string", "12345", 5l)
+        .baselineValues(5l, "meh", "asdf", 5l)
+        .baselineValues(5l, "meh", "12345", 5l)
+        .go();
+  }
+
+  @Test
+  public void testSimpleMergeJoin() {
+    MergeJoinPOP joinConf = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT);
+    // TODO - figure out where to add validation, column names must be unique, even between the two batches,
+    // for all columns, not just the one in the join condition
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"x\": 5, \"a\" : \"a string\"}]",
+        "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
+        "[{\"x1\": 5, \"a2\" : \"12345\"}, {\"x1\": 6, \"a2\" : \"qwerty\"}]");
+    opTestBuilder()
+        .physicalOperator(joinConf)
+        .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
+        .baselineColumns("x", "a", "a2", "x1")
+        .baselineValues(5l, "a string", "asdf", 5l)
+        .baselineValues(5l, "a string", "12345", 5l)
+        .baselineValues(5l, "a different string", "asdf", 5l)
+        .baselineValues(5l, "a different string", "12345", 5l)
+        .baselineValues(5l, "meh", "asdf", 5l)
+        .baselineValues(5l, "meh", "12345", 5l)
+        .go();
+  }
+
+  @Test
+  public void testSimpleHashAgg() {
+    HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+    List<String> inputJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 1 }]",
+        "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+        .physicalOperator(aggConf)
+        .inputDataStreamJson(inputJsonBatches)
+        .baselineColumns("b_sum", "a")
+        .baselineValues(6l, 5l)
+        .baselineValues(8l, 3l)
+        .go();
+  }
+
+  @Test
+  public void testSimpleStreamAgg() {
+    StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+    List<String> inputJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 1 }]",
+        "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+        .physicalOperator(aggConf)
+        .inputDataStreamJson(inputJsonBatches)
+        .baselineColumns("b_sum", "a")
+        .baselineValues(6l, 5l)
+        .baselineValues(8l, 3l)
+        .go();
+  }
+
+  @Test
+  public void testComplexToJson() {
+    ComplexToJson complexToJson = new ComplexToJson(null);
+    List<String> inputJsonBatches = Lists.newArrayList(
+        "[{\"a\": {\"b\" : 1 }}]",
+        "[{\"a\": {\"b\" : 5}},{\"a\": {\"b\" : 8}}]");
+    opTestBuilder()
+        .physicalOperator(complexToJson)
+        .inputDataStreamJson(inputJsonBatches)
+        .baselineColumns("a")
+        .baselineValues("{\n  \"b\" : 1\n}")
+        .baselineValues("{\n  \"b\" : 5\n}")
+        .baselineValues("{\n  \"b\" : 8\n}")
+        .go();
+  }
+
+  @Test
+  public void testFilter() {
+    Filter filterConf = new Filter(null, parseExpr("a=5"), 1.0f);
+    List<String> inputJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 1 }]",
+        "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+        "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+    opTestBuilder()
+        .physicalOperator(filterConf)
+        .inputDataStreamJson(inputJsonBatches)
+        .baselineColumns("a", "b")
+        .baselineValues(5l, 1l)
+        .baselineValues(5l, 5l)
+        .go();
+  }
+
+  @Test
+  public void testExternalSort() {
+    ExternalSort sortConf = new ExternalSort(null,
+        Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
+    List<String> inputJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 1 }]",
+        "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+        "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+    opTestBuilder()
+        .physicalOperator(sortConf)
+        .inputDataStreamJson(inputJsonBatches)
+        .baselineColumns("a", "b")
+        .baselineValues(5l, 1l)
+        .baselineValues(40l, 3l)
+        .baselineValues(5l, 5l)
+        .baselineValues(3l, 8l)
+        .baselineValues(13l, 100l)
+        .go();
+  }
+
+  private void externalSortLowMemoryHelper(int batchSize, int numberOfBatches, long initReservation, long maxAllocation) {
+    ExternalSort sortConf = new ExternalSort(null,
+        Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    for (int j = 0; j < numberOfBatches; j++) {
+      batchString.append("[");
+      for (int i = 0; i < batchSize; i++) {
+        batchString.append("{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8},");
+      }
+      batchString.append("{\"a\": 5, \"b\" : 1 }");
+      batchString.append("]");
+      inputJsonBatches.add(batchString.toString());
+    }
+
+    OperatorTestBuilder opTestBuilder =
+        opTestBuilder()
+            .initReservation(initReservation)
+            .maxAllocation(maxAllocation)
+            .physicalOperator(sortConf)
+            .inputDataStreamJson(inputJsonBatches)
+            .baselineColumns("a", "b");
+    for (int i = 0; i < numberOfBatches; i++) {
+      opTestBuilder.baselineValues(5l, 1l);
+    }
+    for (int i = 0; i < batchSize * numberOfBatches; i++) {
+      opTestBuilder.baselineValues(5l, 5l);
+    }
+    for (int i = 0; i < batchSize * numberOfBatches; i++) {
+      opTestBuilder.baselineValues(3l, 8l);
+    }
+    opTestBuilder.go();
+  }
+
+  // TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate buffer of size 262144 (rounded from 147456) due to memory limit. Current allocation: 16422656
+  // look in ExternalSortBatch for this JIRA number, changing this percentage of the allocator limit that is
+  // the threshold for spilling (it worked with 0.65 for me) "fixed" the problem but hurt perf, will want
+  // to find a better solutions to this problem. When it is fixed this threshold will likely become unnecessary
+  @Test
+  @Ignore("DRILL-4438")
+  public void testExternalSortLowMemory1() {
+    externalSortLowMemoryHelper(4960, 100, 10000000, 16500000);
+  }
+
+  // TODO- believe this was failing in the scan not the sort, may not require a fix
+  @Test
+  @Ignore("DRILL-4438")
+  public void testExternalSortLowMemory2() {
+    externalSortLowMemoryHelper(4960, 100, 10000000, 15000000);
+  }
+
+  // TODO - believe this was failing in the scan not the sort, may not require a fix
+  @Test
+  @Ignore("DRILL-4438")
+  public void testExternalSortLowMemory3() {
+    externalSortLowMemoryHelper(40960, 10, 10000000, 10000000);
+  }
+
+  // TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate sv2 buffer after repeated attempts
+  // see comment above testExternalSortLowMemory1 about TODO left in ExternalSortBatch
+  @Test
+  @Ignore("DRILL-4438")
+  public void testExternalSortLowMemory4() {
+    externalSortLowMemoryHelper(15960, 30, 10000000, 14500000);
+  }
+
+  @Test
+  public void testTopN() {
+    TopN sortConf = new TopN(null,
+        Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false, 3);
+    List<String> inputJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : 1 }]",
+        "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+        "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+    opTestBuilder()
+        .physicalOperator(sortConf)
+        .inputDataStreamJson(inputJsonBatches)
+        .baselineColumns("a", "b")
+        .baselineValues(5l, 1l)
+        .baselineValues(40l, 3l)
+        .baselineValues(5l, 5l)
+        .go();
+  }
+
+  // TODO(DRILL-4439) - doesn't expect incoming batches, uses instead RawFragmentBatch
+  // need to figure out how to mock these
+  @Ignore
+  @Test
+  public void testSimpleMergingReceiver() {
+    MergingReceiverPOP mergeConf = new MergingReceiverPOP(-1, Lists.<MinorFragmentEndpoint>newArrayList(),
+        Lists.newArrayList(ordering("x", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"x\": 5, \"a\" : \"a string\"}]",
+        "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"x\": 5, \"a\" : \"asdf\"}]",
+        "[{\"x\": 5, \"a\" : \"12345\"}, {\"x\": 6, \"a\" : \"qwerty\"}]");
+    opTestBuilder()
+        .physicalOperator(mergeConf)
+        .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
+        .baselineColumns("x", "a")
+        .baselineValues(5l, "a string")
+        .baselineValues(5l, "a different string")
+        .baselineValues(5l, "meh")
+        .baselineValues(5l, "asdf")
+        .baselineValues(5l, "12345")
+        .baselineValues(6l, "qwerty")
+        .go();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
new file mode 100644
index 0000000..245e5bb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.unit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import mockit.Delegate;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.DrillTestWrapper;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.parser.ExprLexer;
+import org.apache.drill.common.expression.parser.ExprParser;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.BufferManagerImpl;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.project.Projector;
+import org.apache.drill.exec.physical.impl.project.ProjectorTemplate;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.TypeValidators;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.easy.json.JSONRecordReader;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.test.DrillTest;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Look! Doesn't extend BaseTestQuery!!
+ */
+public class PhysicalOpUnitTestBase extends DrillTest {
+
+  @Injectable FragmentContext fragContext;
+  @Injectable OperatorContext opContext;
+  @Injectable OperatorStats opStats;
+  @Injectable OptionManager optManager;
+  @Injectable PhysicalOperator popConf;
+  @Injectable ExecutionControls executionControls;
+
+  private final DrillConfig drillConf = DrillConfig.create();
+  private final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConf);
+  private final BufferManagerImpl bufManager = new BufferManagerImpl(allocator);
+  private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
+  private final FunctionImplementationRegistry funcReg = new FunctionImplementationRegistry(drillConf, classpathScan);
+  private final TemplateClassDefinition templateClassDefinition = new TemplateClassDefinition<>(Projector.class, ProjectorTemplate.class);
+  private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
+
+  protected LogicalExpression parseExpr(String expr) {
+    ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
+    CommonTokenStream tokens = new CommonTokenStream(lexer);
+    ExprParser parser = new ExprParser(tokens);
+    try {
+      return parser.parse().e;
+    } catch (RecognitionException e) {
+      throw new RuntimeException("Error parsing expression: " + expr);
+    }
+  }
+
+  protected Order.Ordering ordering(String expression, RelFieldCollation.Direction direction, RelFieldCollation.NullDirection nullDirection) {
+    return new Order.Ordering(direction, parseExpr(expression), nullDirection);
+  }
+
+  protected JoinCondition joinCond(String leftExpr, String relationship, String rightExpr) {
+    return new JoinCondition(relationship, parseExpr(leftExpr), parseExpr(rightExpr));
+  }
+
+  protected List<NamedExpression> parseExprs(String... expressionsAndOutputNames) {
+    Preconditions.checkArgument(expressionsAndOutputNames.length %2 ==0, "List of expressions and output field names" +
+        " is not complete, each expression must explicitly give and output name,");
+    List<NamedExpression> ret = new ArrayList<>();
+    for (int i = 0; i < expressionsAndOutputNames.length; i += 2) {
+      ret.add(new NamedExpression(parseExpr(expressionsAndOutputNames[i]),
+          new FieldReference(new SchemaPath(new PathSegment.NameSegment(expressionsAndOutputNames[i+1])))));
+    }
+    return ret;
+  }
+
+
+  void runTest(OperatorTestBuilder testBuilder) {
+    BatchCreator<PhysicalOperator> opCreator;
+    RecordBatch testOperator;
+    try {
+      mockFragmentContext(testBuilder.initReservation, testBuilder.maxAllocation);
+      opCreator = (BatchCreator<PhysicalOperator>)
+          opCreatorReg.getOperatorCreator(testBuilder.popConfig.getClass());
+       List<RecordBatch> incomingStreams = Lists.newArrayList();
+       for (List<String> batchesJson : testBuilder.inputStreamsJSON) {
+         incomingStreams.add(new ScanBatch(null, fragContext,
+             getRecordReadersForJsonBatches(batchesJson, fragContext)));
+       }
+       testOperator = opCreator.getBatch(fragContext, testBuilder.popConfig, incomingStreams);
+
+      Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
+      Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(testBuilder.baselineRecords);
+      DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+
+    } catch (ExecutionSetupException e) {
+      throw new RuntimeException(e);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    } catch (SchemaChangeException e) {
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static class BatchIterator implements Iterable<VectorAccessible> {
+
+    private RecordBatch operator;
+    public BatchIterator(RecordBatch operator) {
+      this.operator = operator;
+    }
+
+    @Override
+    public Iterator<VectorAccessible> iterator() {
+      return new Iterator<VectorAccessible>() {
+        boolean needToGrabNext = true;
+        RecordBatch.IterOutcome lastResultOutcome;
+        @Override
+        public boolean hasNext() {
+          if (needToGrabNext) {
+            lastResultOutcome = operator.next();
+            needToGrabNext = false;
+          }
+          if (lastResultOutcome == RecordBatch.IterOutcome.NONE
+            || lastResultOutcome == RecordBatch.IterOutcome.STOP) {
+            return false;
+          } else if (lastResultOutcome == RecordBatch.IterOutcome.OUT_OF_MEMORY) {
+            throw new RuntimeException("Operator ran out of memory");
+          } else {
+            return true;
+          }
+        }
+
+        @Override
+        public VectorAccessible next() {
+          if (needToGrabNext) {
+            lastResultOutcome = operator.next();
+          }
+          needToGrabNext = true;
+          return operator;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException("Remove is not supported.");
+        }
+      };
+    }
+  }
+
+  protected OperatorTestBuilder opTestBuilder() {
+    return new OperatorTestBuilder();
+  }
+
+  protected class OperatorTestBuilder {
+
+    private PhysicalOperator popConfig;
+    private String[] baselineColumns;
+    private List<Map<String, Object>> baselineRecords;
+    private List<List<String>> inputStreamsJSON;
+    private long initReservation = 10000000;
+    private long maxAllocation = 15000000;
+
+    public void go() {
+      runTest(this);
+    }
+
+    public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+      this.popConfig = batch;
+      return this;
+    }
+
+    public OperatorTestBuilder initReservation(long initReservation) {
+      this.initReservation = initReservation;
+      return this;
+    }
+
+    public OperatorTestBuilder maxAllocation(long maxAllocation) {
+      this.maxAllocation = maxAllocation;
+      return this;
+    }
+
+    public OperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
+      this.inputStreamsJSON = new ArrayList<>();
+      this.inputStreamsJSON.add(jsonBatches);
+      return this;
+    }
+
+    public OperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
+      this.inputStreamsJSON = childStreams;
+      return this;
+    }
+
+    public OperatorTestBuilder baselineColumns(String... columns) {
+      for (int i = 0; i < columns.length; i++) {
+        LogicalExpression ex = parseExpr(columns[i]);
+        if (ex instanceof SchemaPath) {
+          columns[i] = ((SchemaPath)ex).toExpr();
+        } else {
+          throw new IllegalStateException("Schema path is not a valid format.");
+        }
+      }
+      this.baselineColumns = columns;
+      return this;
+    }
+
+    public OperatorTestBuilder baselineValues(Object ... baselineValues) {
+      if (baselineRecords == null) {
+        baselineRecords = new ArrayList();
+      }
+      Map<String, Object> ret = new HashMap();
+      int i = 0;
+      Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
+          "Must supply the same number of baseline values as columns.");
+      for (String s : baselineColumns) {
+        ret.put(s, baselineValues[i]);
+        i++;
+      }
+      this.baselineRecords.add(ret);
+      return this;
+    }
+  }
+
+  private void mockFragmentContext(long initReservation, long maxAllocation) {
+    final CodeCompiler compiler = new CodeCompiler(drillConf, optManager);
+    final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
+    new NonStrictExpectations() {
+      {
+        optManager.getOption(withAny(new TypeValidators.BooleanValidator("", false))); result = false;
+        // TODO(DRILL-4450) - Probably want to just create a default option manager, this is a hack to prevent
+        // the code compilation from failing when trying to decide of scalar replacement is turned on
+        // this will cause other code paths to fail because this return value won't be valid for most
+        // string options
+        optManager.getOption(withAny(new TypeValidators.StringValidator("", "try"))); result = "try";
+        optManager.getOption(withAny(new TypeValidators.PositiveLongValidator("", 1l, 1l))); result = 10;
+        fragContext.getOptions(); result = optManager;
+        fragContext.getManagedBuffer(); result = bufManager.getManagedBuffer();
+        fragContext.shouldContinue(); result = true;
+        fragContext.getExecutionControls(); result = executionControls;
+        fragContext.getFunctionRegistry(); result = funcReg;
+        fragContext.getConfig(); result = drillConf;
+        fragContext.getHandle(); result = ExecProtos.FragmentHandle.getDefaultInstance();
+        try {
+          fragContext.getImplementationClass(withAny(CodeGenerator.get(templateClassDefinition, funcReg)));
+          result = new Delegate()
+          {
+            Object getImplementationClass(CodeGenerator gen) throws IOException, ClassTransformationException {
+              return compiler.getImplementationClass(gen);
+            }
+          };
+          fragContext.getImplementationClass(withAny(CodeGenerator.get(templateClassDefinition, funcReg).getRoot()));
+          result = new Delegate()
+          {
+            Object getImplementationClass(ClassGenerator gen) throws IOException, ClassTransformationException {
+              return compiler.getImplementationClass(gen.getCodeGenerator());
+            }
+          };
+        } catch (ClassTransformationException e) {
+          throw new RuntimeException(e);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        opContext.getStats();result = opStats;
+        opContext.getAllocator(); result = allocator;
+        fragContext.newOperatorContext(withAny(popConf));result = opContext;
+      }
+    };
+  }
+
+  private Iterator<RecordReader> getRecordReadersForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
+    ObjectMapper mapper = new ObjectMapper();
+    List<RecordReader> readers = new ArrayList<>();
+    for (String batchJason : jsonBatches) {
+      JsonNode records;
+      try {
+        records = mapper.readTree(batchJason);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      readers.add(new JSONRecordReader(fragContext, records, null, Collections.singletonList(SchemaPath.getSimplePath("*"))));
+    }
+    return readers.iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
index 56e58dc..fd5d4f0 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
@@ -186,7 +186,7 @@ public class JdbcDataTest extends JdbcTestBase {
     Scan scan = findOnlyOperator(plan, Scan.class);
     Assert.assertEquals("donuts-json", scan.getStorageEngine());
     Project project = findOnlyOperator(plan, Project.class);
-    Assert.assertEquals(1, project.getSelections().length);
+    Assert.assertEquals(1, project.getSelections().size());
     Assert.assertEquals(Scan.class, project.getInput().getClass());
     Store store = findOnlyOperator(plan, Store.class);
     Assert.assertEquals("queue", store.getStorageEngine());
@@ -244,9 +244,9 @@ public class JdbcDataTest extends JdbcTestBase {
     Assert.assertTrue(filter.getInput() instanceof Scan);
     Project[] projects = Iterables.toArray(findOperator(plan, Project.class), Project.class);
     Assert.assertEquals(2, projects.length);
-    Assert.assertEquals(1, projects[0].getSelections().length);
+    Assert.assertEquals(1, projects[0].getSelections().size());
     Assert.assertEquals(Filter.class, projects[0].getInput().getClass());
-    Assert.assertEquals(2, projects[1].getSelections().length);
+    Assert.assertEquals(2, projects[1].getSelections().size());
     Assert.assertEquals(Project.class, projects[1].getInput().getClass());
     Store store = findOnlyOperator(plan, Store.class);
     Assert.assertEquals("queue", store.getStorageEngine());