You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by js...@apache.org on 2016/04/20 18:08:56 UTC
[4/4] drill git commit: DRILL-4437: Operator unit test framework
DRILL-4437: Operator unit test framework
Closes #394
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d93a3633
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d93a3633
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d93a3633
Branch: refs/heads/master
Commit: d93a3633815ed1c7efd6660eae62b7351a2c9739
Parents: 01e04cd
Author: Jason Altekruse <al...@gmail.com>
Authored: Fri Feb 26 14:55:30 2016 -0800
Committer: Jason Altekruse <al...@gmail.com>
Committed: Wed Apr 20 09:07:13 2016 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 6 +-
.../drill/exec/compile/ClassTransformer.java | 4 +-
.../drill/exec/physical/impl/ImplCreator.java | 4 +-
.../exec/physical/impl/join/HashJoinBatch.java | 2 +-
.../physical/impl/xsort/ExternalSortBatch.java | 4 +
.../exec/store/easy/json/JSONRecordReader.java | 4 +-
.../java/org/apache/drill/DrillTestWrapper.java | 215 ++++++++----
.../test/java/org/apache/drill/TestBuilder.java | 4 -
.../physical/unit/BasicPhysicalOpUnitTest.java | 322 +++++++++++++++++
.../physical/unit/PhysicalOpUnitTestBase.java | 341 +++++++++++++++++++
.../apache/drill/jdbc/test/JdbcDataTest.java | 6 +-
11 files changed, 834 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index a490116..6a0889d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -149,7 +149,7 @@ public interface ExecConstants {
OptionValidator FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir");
String JSON_READ_NUMBERS_AS_DOUBLE = "store.json.read_numbers_as_double";
- OptionValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false);
+ BooleanValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false);
String MONGO_ALL_TEXT_MODE = "store.mongo.all_text_mode";
OptionValidator MONGO_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(MONGO_ALL_TEXT_MODE, false);
@@ -178,9 +178,9 @@ public interface ExecConstants {
* HashTable runtime settings
*/
String MIN_HASH_TABLE_SIZE_KEY = "exec.min_hash_table_size";
- OptionValidator MIN_HASH_TABLE_SIZE = new PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.DEFAULT_INITIAL_CAPACITY);
+ PositiveLongValidator MIN_HASH_TABLE_SIZE = new PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.DEFAULT_INITIAL_CAPACITY);
String MAX_HASH_TABLE_SIZE_KEY = "exec.max_hash_table_size";
- OptionValidator MAX_HASH_TABLE_SIZE = new PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.MAXIMUM_CAPACITY);
+ PositiveLongValidator MAX_HASH_TABLE_SIZE = new PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.MAXIMUM_CAPACITY);
/**
* Limits the maximum level of parallelization to this factor time the number of Drillbits
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 3c93599..02323a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -216,9 +216,7 @@ public class ClassTransformer {
final String entireClass,
final String materializedClassName) throws ClassTransformationException {
// unfortunately, this hasn't been set up at construction time, so we have to do it here
- final OptionValue optionValue = optionManager.getOption(SCALAR_REPLACEMENT_OPTION);
- final ScalarReplacementOption scalarReplacementOption =
- ScalarReplacementOption.fromString((String) optionValue.getValue()); // TODO(DRILL-2474)
+ final ScalarReplacementOption scalarReplacementOption = ScalarReplacementOption.fromString(optionManager.getOption(SCALAR_REPLACEMENT_VALIDATOR));
try {
final long t1 = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 8a8a1ae..5872ef1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -120,7 +121,8 @@ public class ImplCreator {
/** Create a RecordBatch and its children for given PhysicalOperator */
- private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ @VisibleForTesting
+ public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
Preconditions.checkNotNull(op);
final List<RecordBatch> childRecordBatches = getChildren(op, context);
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2ba54dd..2ace69e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -308,7 +308,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
final HashTableConfig htConfig =
- new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
+ new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
// Create the chained hash table
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 7797339..0ee518e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -374,6 +374,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
(spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
// If we haven't spilled so far, make sure we don't exceed the maximum number of batches SV4 can address
(spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
+ // TODO(DRILL-4438) - consider setting this threshold more intelligently,
+ // lowering caused a failing low memory condition (test in BasicPhysicalOpUnitTest)
+ // to complete successfully (although it caused perf decrease as there was more spilling)
+
// current memory used is more than 95% of memory usage limit of this operator
(oAllocator.getAllocatedMemory() > .95 * oAllocator.getLimit()) ||
// Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index e943401..dbbe6b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -112,8 +112,8 @@ public class JSONRecordReader extends AbstractRecordReader {
// only enable all text mode if we aren't using embedded content mode.
this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
- this.readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val;
- this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+ this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
+ this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
setColumns(columns);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index f853414..2a9c03d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -24,8 +24,10 @@ import java.io.UnsupportedEncodingException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,7 +46,10 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.HyperVectorWrapper;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.util.Text;
import org.apache.drill.exec.vector.ValueVector;
@@ -147,19 +152,19 @@ public class DrillTestWrapper {
i++;
}
}
- for (HyperVectorValueIterator hvi : expectedRecords.values()) {
- for (ValueVector vv : hvi.getHyperVector().getValueVectors()) {
- vv.clear();
- }
- }
- for (HyperVectorValueIterator hvi : actualRecords.values()) {
+ cleanupHyperValueIterators(expectedRecords.values());
+ cleanupHyperValueIterators(actualRecords.values());
+ }
+
+ private void cleanupHyperValueIterators(Collection<HyperVectorValueIterator> hyperBatches) {
+ for (HyperVectorValueIterator hvi : hyperBatches) {
for (ValueVector vv : hvi.getHyperVector().getValueVectors()) {
vv.clear();
}
}
}
- private void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception {
+ public static void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception {
for (String s : actualRecords.keySet()) {
assertNotNull("Unexpected extra column " + s + " returned by query.", expectedRecords.get(s));
assertEquals("Incorrect number of rows returned by query.", expectedRecords.get(s).size(), actualRecords.get(s).size());
@@ -180,7 +185,7 @@ public class DrillTestWrapper {
}
}
- private String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) {
+ private static String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) {
StringBuilder expected = new StringBuilder();
StringBuilder actual = new StringBuilder();
expected.append("Expected Records near verification failure:\n");
@@ -208,8 +213,9 @@ public class DrillTestWrapper {
}
- private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryDataBatch> records, RecordBatchLoader loader,
- BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+ private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records,
+ final RecordBatchLoader loader)
+ throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>();
@@ -218,7 +224,6 @@ public class DrillTestWrapper {
int size = records.size();
for (int i = 0; i < size; i++) {
batch = records.get(i);
- loader = new RecordBatchLoader(getAllocator());
loader.load(batch.getHeader().getDef(), batch.getData());
logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
totalRecords += loader.getRecordCount();
@@ -241,30 +246,70 @@ public class DrillTestWrapper {
return combinedVectors;
}
+ private static class BatchIterator implements Iterable<VectorAccessible>, AutoCloseable {
+ private final List<QueryDataBatch> dataBatches;
+ private final RecordBatchLoader batchLoader;
+
+ public BatchIterator(List<QueryDataBatch> dataBatches, RecordBatchLoader batchLoader) {
+ this.dataBatches = dataBatches;
+ this.batchLoader = batchLoader;
+ }
+
+ @Override
+ public Iterator<VectorAccessible> iterator() {
+ return new Iterator<VectorAccessible>() {
+
+ int index = -1;
+
+ @Override
+ public boolean hasNext() {
+ return index < dataBatches.size() - 1;
+ }
+
+ @Override
+ public VectorAccessible next() {
+ index++;
+ if (index == dataBatches.size()) {
+ throw new RuntimeException("Tried to call next when iterator had no more items.");
+ }
+ batchLoader.clear();
+ QueryDataBatch batch = dataBatches.get(index);
+ try {
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
+ } catch (SchemaChangeException e) {
+ throw new RuntimeException(e);
+ }
+ return batchLoader;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Removing is not supported");
+ }
+ };
+ }
+
+ @Override
+ public void close() throws Exception {
+ batchLoader.clear();
+ }
+
+ }
+
/**
- * Only use this method if absolutely needed. There are utility methods to compare results of single queries.
- * The current use case for exposing this is setting session or system options between the test and verification
- * queries.
- *
- * TODO - evaluate adding an interface to allow setting session and system options before running queries
- * @param records
- * @param loader
- * @param schema
+ * @param batches
* @return
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
- private Map<String, List<Object>> addToCombinedVectorResults(List<QueryDataBatch> records, RecordBatchLoader loader,
- BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+ public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
+ throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, List<Object>> combinedVectors = new TreeMap<>();
long totalRecords = 0;
- QueryDataBatch batch;
- int size = records.size();
- for (int i = 0; i < size; i++) {
- batch = records.get(0);
- loader.load(batch.getHeader().getDef(), batch.getData());
+ BatchSchema schema = null;
+ for (VectorAccessible loader : batches) {
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throws clause above.
if (schema == null) {
@@ -272,24 +317,66 @@ public class DrillTestWrapper {
for (MaterializedField mf : schema) {
combinedVectors.put(SchemaPath.getSimplePath(mf.getPath()).toExpr(), new ArrayList<Object>());
}
+ } else {
+ // TODO - actually handle schema changes, this is just to get access to the SelectionVectorMode
+ // of the current batch, the check for a null schema is used to only mutate the schema once
+ // need to add new vectors and null fill for previous batches? distinction between null and non-existence important?
+ schema = loader.getSchema();
}
logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
totalRecords += loader.getRecordCount();
for (VectorWrapper<?> w : loader) {
String field = SchemaPath.getSimplePath(w.getField().getPath()).toExpr();
- for (int j = 0; j < loader.getRecordCount(); j++) {
- Object obj = w.getValueVector().getAccessor().getObject(j);
- if (obj != null) {
- if (obj instanceof Text) {
- obj = obj.toString();
+ ValueVector[] vectors;
+ if (w.isHyper()) {
+ vectors = w.getValueVectors();
+ } else {
+ vectors = new ValueVector[] {w.getValueVector()};
+ }
+ SelectionVector2 sv2 = null;
+ SelectionVector4 sv4 = null;
+ switch(schema.getSelectionVectorMode()) {
+ case TWO_BYTE:
+ sv2 = loader.getSelectionVector2();
+ break;
+ case FOUR_BYTE:
+ sv4 = loader.getSelectionVector4();
+ break;
+ }
+ if (sv4 != null) {
+ for (int j = 0; j < sv4.getCount(); j++) {
+ int complexIndex = sv4.get(j);
+ int batchIndex = complexIndex >> 16;
+ int recordIndexInBatch = complexIndex & 65535;
+ Object obj = vectors[batchIndex].getAccessor().getObject(recordIndexInBatch);
+ if (obj != null) {
+ if (obj instanceof Text) {
+ obj = obj.toString();
+ }
+ }
+ combinedVectors.get(field).add(obj);
+ }
+ }
+ else {
+ for (ValueVector vv : vectors) {
+ for (int j = 0; j < loader.getRecordCount(); j++) {
+ int index;
+ if (sv2 != null) {
+ index = sv2.getIndex(j);
+ } else {
+ index = j;
+ }
+ Object obj = vv.getAccessor().getObject(index);
+ if (obj != null) {
+ if (obj instanceof Text) {
+ obj = obj.toString();
+ }
+ }
+ combinedVectors.get(field).add(obj);
}
}
- combinedVectors.get(field).add(obj);
}
}
- records.remove(0);
- batch.release();
- loader.clear();
}
return combinedVectors;
}
@@ -342,7 +429,6 @@ public class DrillTestWrapper {
*/
protected void compareUnorderedResults() throws Exception {
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- BatchSchema schema = null;
List<QueryDataBatch> actual = Collections.emptyList();
List<QueryDataBatch> expected = Collections.emptyList();
@@ -356,14 +442,14 @@ public class DrillTestWrapper {
checkNumBatches(actual);
addTypeInfoIfMissing(actual.get(0), testBuilder);
- addToMaterializedResults(actualRecords, actual, loader, schema);
+ addToMaterializedResults(actualRecords, actual, loader);
// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
// the cases where the baseline is stored in a file.
if (baselineRecords == null) {
BaseTestQuery.test(baselineOptionSettingQueries);
expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
- addToMaterializedResults(expectedRecords, expected, loader, schema);
+ addToMaterializedResults(expectedRecords, expected, loader);
} else {
expectedRecords = baselineRecords;
}
@@ -409,28 +495,24 @@ public class DrillTestWrapper {
// To avoid extra work for test writers, types can optionally be inferred from the test query
addTypeInfoIfMissing(actual.get(0), testBuilder);
- actualSuperVectors = addToCombinedVectorResults(actual, loader, schema);
+ BatchIterator batchIter = new BatchIterator(actual, loader);
+ actualSuperVectors = addToCombinedVectorResults(batchIter);
+ batchIter.close();
// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
// the cases where the baseline is stored in a file.
if (baselineRecords == null) {
BaseTestQuery.test(baselineOptionSettingQueries);
expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
- expectedSuperVectors = addToCombinedVectorResults(expected, loader, schema);
+ BatchIterator exBatchIter = new BatchIterator(expected, loader);
+ expectedSuperVectors = addToCombinedVectorResults(exBatchIter);
+ exBatchIter.close();
} else {
// data is built in the TestBuilder in a row major format as it is provided by the user
// translate it here to vectorized, the representation expected by the ordered comparison
- expectedSuperVectors = new TreeMap<>();
- expected = new ArrayList<>();
- for (String s : baselineRecords.get(0).keySet()) {
- expectedSuperVectors.put(s, new ArrayList<>());
- }
- for (Map<String, Object> m : baselineRecords) {
- for (String s : m.keySet()) {
- expectedSuperVectors.get(s).add(m.get(s));
- }
- }
+ expectedSuperVectors = translateRecordListToHeapVectors(baselineRecords);
}
+
compareMergedVectors(expectedSuperVectors, actualSuperVectors);
} catch (Exception e) {
throw new Exception(e.getMessage() + "\nFor query: " + query , e);
@@ -439,9 +521,21 @@ public class DrillTestWrapper {
}
}
+ public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) {
+ Map<String, List<Object>> ret = new TreeMap<>();
+ for (String s : records.get(0).keySet()) {
+ ret.put(s, new ArrayList<>());
+ }
+ for (Map<String, Object> m : records) {
+ for (String s : m.keySet()) {
+ ret.get(s).add(m.get(s));
+ }
+ }
+ return ret;
+ }
+
public void compareResultsHyperVector() throws Exception {
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- BatchSchema schema = null;
BaseTestQuery.test(testOptionSettingQueries);
List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
@@ -451,12 +545,12 @@ public class DrillTestWrapper {
// To avoid extra work for test writers, types can optionally be inferred from the test query
addTypeInfoIfMissing(results.get(0), testBuilder);
- Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader, schema);
+ Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader);
BaseTestQuery.test(baselineOptionSettingQueries);
List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
- Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader, schema);
+ Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader);
compareHyperVectors(expectedSuperVectors, actualSuperVectors);
cleanupBatches(results, expected);
@@ -496,8 +590,10 @@ public class DrillTestWrapper {
}
}
- protected void addToMaterializedResults(List<Map<String, Object>> materializedRecords, List<QueryDataBatch> records, RecordBatchLoader loader,
- BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+ public static void addToMaterializedResults(List<Map<String, Object>> materializedRecords,
+ List<QueryDataBatch> records,
+ RecordBatchLoader loader)
+ throws SchemaChangeException, UnsupportedEncodingException {
long totalRecords = 0;
QueryDataBatch batch;
int size = records.size();
@@ -506,9 +602,6 @@ public class DrillTestWrapper {
loader.load(batch.getHeader().getDef(), batch.getData());
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throws clause above.
- if (schema == null) {
- schema = loader.getSchema();
- }
logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
totalRecords += loader.getRecordCount();
for (int j = 0; j < loader.getRecordCount(); j++) {
@@ -531,7 +624,7 @@ public class DrillTestWrapper {
}
}
- public boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception {
+ public static boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception {
if (compareValues(expected, actual, counter, column)) {
return true;
@@ -554,7 +647,7 @@ public class DrillTestWrapper {
return true;
}
- public boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception {
+ public static boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception {
if (expected == null) {
if (actual == null) {
if (VERBOSE_DEBUG) {
@@ -648,7 +741,7 @@ public class DrillTestWrapper {
assertEquals(0, actualRecords.size());
}
- private String findMissingColumns(Set<String> expected, Set<String> actual) {
+ private static String findMissingColumns(Set<String> expected, Set<String> actual) {
String missingCols = "";
for (String colName : expected) {
if (!actual.contains(colName)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index 8702eb5..b073371 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -194,16 +194,12 @@ public class TestBuilder {
// modified code from SchemaPath.De class. This should be used sparingly and only in tests if absolutely needed.
public static SchemaPath parsePath(String path) {
try {
- // logger.debug("Parsing expression string '{}'", expr);
ExprLexer lexer = new ExprLexer(new ANTLRStringStream(path));
CommonTokenStream tokens = new CommonTokenStream(lexer);
ExprParser parser = new ExprParser(tokens);
- //TODO: move functionregistry and error collector to injectables.
- //ctxt.findInjectableValue(valueId, forProperty, beanInstance)
ExprParser.parse_return ret = parser.parse();
- // ret.e.resolveAndValidate(expr, errorCollector);
if (ret.e instanceof SchemaPath) {
return (SchemaPath) ret.e;
} else {
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
new file mode 100644
index 0000000..6f2f160
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.physical.config.ComplexToJson;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.MergingReceiverPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.TopN;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.drill.TestBuilder.mapOf;
+
+public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
+
+ @Test
+ public void testSimpleProject() {
+ Project projectConf = new Project(parseExprs("x+5", "x"), null);
+ List<String> jsonBatches = Lists.newArrayList(
+ "[{\"x\": 5 },{\"x\": 10 }]",
+ "[{\"x\": 20 },{\"x\": 30 },{\"x\": 40 }]");
+ opTestBuilder()
+ .physicalOperator(projectConf)
+ .inputDataStreamJson(jsonBatches)
+ .baselineColumns("x")
+ .baselineValues(10l)
+ .baselineValues(15l)
+ .baselineValues(25l)
+ .baselineValues(35l)
+ .baselineValues(45l)
+ .go();
+ }
+
+ @Test
+ public void testProjectComplexOutput() {
+ Project projectConf = new Project(parseExprs("convert_from(json_col, 'JSON')", "complex_col"), null);
+ List<String> jsonBatches = Lists.newArrayList(
+ "[{\"json_col\": \"{ \\\"a\\\" : 1 }\"}]",
+ "[{\"json_col\": \"{ \\\"a\\\" : 5 }\"}]");
+ opTestBuilder()
+ .physicalOperator(projectConf)
+ .inputDataStreamJson(jsonBatches)
+ .baselineColumns("complex_col")
+ .baselineValues(mapOf("a", 1l))
+ .baselineValues(mapOf("a", 5l))
+ .go();
+ }
+
+ @Test
+ public void testSimpleHashJoin() {
+ HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT);
+ // TODO - figure out where to add validation, column names must be unique, even between the two batches,
+ // for all columns, not just the one in the join condition
+ // TODO - if any are common between the two, it is failing in the generated setup method in HashJoinProbeGen
+ List<String> leftJsonBatches = Lists.newArrayList(
+ "[{\"x\": 5, \"a\" : \"a string\"}]",
+ "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
+ List<String> rightJsonBatches = Lists.newArrayList(
+ "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
+ "[{\"x1\": 6, \"a2\" : \"qwerty\"},{\"x1\": 5, \"a2\" : \"12345\"}]");
+ opTestBuilder()
+ .physicalOperator(joinConf)
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
+ .baselineColumns("x", "a", "a2", "x1")
+ .baselineValues(5l, "a string", "asdf", 5l)
+ .baselineValues(5l, "a string", "12345", 5l)
+ .baselineValues(5l, "a different string", "asdf", 5l)
+ .baselineValues(5l, "a different string", "12345", 5l)
+ .baselineValues(5l, "meh", "asdf", 5l)
+ .baselineValues(5l, "meh", "12345", 5l)
+ .go();
+ }
+
+ @Test
+ public void testSimpleMergeJoin() {
+ MergeJoinPOP joinConf = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT);
+ // TODO - figure out where to add validation, column names must be unique, even between the two batches,
+ // for all columns, not just the one in the join condition
+ List<String> leftJsonBatches = Lists.newArrayList(
+ "[{\"x\": 5, \"a\" : \"a string\"}]",
+ "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
+ List<String> rightJsonBatches = Lists.newArrayList(
+ "[{\"x1\": 5, \"a2\" : \"asdf\"}]",
+ "[{\"x1\": 5, \"a2\" : \"12345\"}, {\"x1\": 6, \"a2\" : \"qwerty\"}]");
+ opTestBuilder()
+ .physicalOperator(joinConf)
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
+ .baselineColumns("x", "a", "a2", "x1")
+ .baselineValues(5l, "a string", "asdf", 5l)
+ .baselineValues(5l, "a string", "12345", 5l)
+ .baselineValues(5l, "a different string", "asdf", 5l)
+ .baselineValues(5l, "a different string", "12345", 5l)
+ .baselineValues(5l, "meh", "asdf", 5l)
+ .baselineValues(5l, "meh", "12345", 5l)
+ .go();
+ }
+
+ @Test
+ public void testSimpleHashAgg() {
+ HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(aggConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("b_sum", "a")
+ .baselineValues(6l, 5l)
+ .baselineValues(8l, 3l)
+ .go();
+ }
+
+ @Test
+ public void testSimpleStreamAgg() {
+ StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(aggConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("b_sum", "a")
+ .baselineValues(6l, 5l)
+ .baselineValues(8l, 3l)
+ .go();
+ }
+
+ @Test
+ public void testComplexToJson() {
+ ComplexToJson complexToJson = new ComplexToJson(null);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": {\"b\" : 1 }}]",
+ "[{\"a\": {\"b\" : 5}},{\"a\": {\"b\" : 8}}]");
+ opTestBuilder()
+ .physicalOperator(complexToJson)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a")
+ .baselineValues("{\n \"b\" : 1\n}")
+ .baselineValues("{\n \"b\" : 5\n}")
+ .baselineValues("{\n \"b\" : 8\n}")
+ .go();
+ }
+
+ @Test
+ public void testFilter() {
+ Filter filterConf = new Filter(null, parseExpr("a=5"), 1.0f);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+ "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+ opTestBuilder()
+ .physicalOperator(filterConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 1l)
+ .baselineValues(5l, 5l)
+ .go();
+ }
+
+ @Test
+ public void testExternalSort() {
+ ExternalSort sortConf = new ExternalSort(null,
+ Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+ "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+ opTestBuilder()
+ .physicalOperator(sortConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 1l)
+ .baselineValues(40l, 3l)
+ .baselineValues(5l, 5l)
+ .baselineValues(3l, 8l)
+ .baselineValues(13l, 100l)
+ .go();
+ }
+
+ private void externalSortLowMemoryHelper(int batchSize, int numberOfBatches, long initReservation, long maxAllocation) {
+ ExternalSort sortConf = new ExternalSort(null,
+ Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+ for (int j = 0; j < numberOfBatches; j++) {
+ batchString.append("[");
+ for (int i = 0; i < batchSize; i++) {
+ batchString.append("{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8},");
+ }
+ batchString.append("{\"a\": 5, \"b\" : 1 }");
+ batchString.append("]");
+ inputJsonBatches.add(batchString.toString());
+ }
+
+ OperatorTestBuilder opTestBuilder =
+ opTestBuilder()
+ .initReservation(initReservation)
+ .maxAllocation(maxAllocation)
+ .physicalOperator(sortConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b");
+ for (int i = 0; i < numberOfBatches; i++) {
+ opTestBuilder.baselineValues(5l, 1l);
+ }
+ for (int i = 0; i < batchSize * numberOfBatches; i++) {
+ opTestBuilder.baselineValues(5l, 5l);
+ }
+ for (int i = 0; i < batchSize * numberOfBatches; i++) {
+ opTestBuilder.baselineValues(3l, 8l);
+ }
+ opTestBuilder.go();
+ }
+
+ // TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate buffer of size 262144 (rounded from 147456) due to memory limit. Current allocation: 16422656
+ // look in ExternalSortBatch for this JIRA number, changing this percentage of the allocator limit that is
+ // the threshold for spilling (it worked with 0.65 for me) "fixed" the problem but hurt perf, will want
+ // to find a better solutions to this problem. When it is fixed this threshold will likely become unnecessary
+ @Test
+ @Ignore("DRILL-4438")
+ public void testExternalSortLowMemory1() {
+ externalSortLowMemoryHelper(4960, 100, 10000000, 16500000);
+ }
+
+ // TODO- believe this was failing in the scan not the sort, may not require a fix
+ @Test
+ @Ignore("DRILL-4438")
+ public void testExternalSortLowMemory2() {
+ externalSortLowMemoryHelper(4960, 100, 10000000, 15000000);
+ }
+
+ // TODO - believe this was failing in the scan not the sort, may not require a fix
+ @Test
+ @Ignore("DRILL-4438")
+ public void testExternalSortLowMemory3() {
+ externalSortLowMemoryHelper(40960, 10, 10000000, 10000000);
+ }
+
+ // TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate sv2 buffer after repeated attempts
+ // see comment above testExternalSortLowMemory1 about TODO left in ExternalSortBatch
+ @Test
+ @Ignore("DRILL-4438")
+ public void testExternalSortLowMemory4() {
+ externalSortLowMemoryHelper(15960, 30, 10000000, 14500000);
+ }
+
+ @Test
+ public void testTopN() {
+ TopN sortConf = new TopN(null,
+ Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false, 3);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
+ "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
+ opTestBuilder()
+ .physicalOperator(sortConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 1l)
+ .baselineValues(40l, 3l)
+ .baselineValues(5l, 5l)
+ .go();
+ }
+
+ // TODO(DRILL-4439) - doesn't expect incoming batches, uses instead RawFragmentBatch
+ // need to figure out how to mock these
+ @Ignore
+ @Test
+ public void testSimpleMergingReceiver() {
+ MergingReceiverPOP mergeConf = new MergingReceiverPOP(-1, Lists.<MinorFragmentEndpoint>newArrayList(),
+ Lists.newArrayList(ordering("x", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
+ List<String> leftJsonBatches = Lists.newArrayList(
+ "[{\"x\": 5, \"a\" : \"a string\"}]",
+ "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
+ List<String> rightJsonBatches = Lists.newArrayList(
+ "[{\"x\": 5, \"a\" : \"asdf\"}]",
+ "[{\"x\": 5, \"a\" : \"12345\"}, {\"x\": 6, \"a\" : \"qwerty\"}]");
+ opTestBuilder()
+ .physicalOperator(mergeConf)
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
+ .baselineColumns("x", "a")
+ .baselineValues(5l, "a string")
+ .baselineValues(5l, "a different string")
+ .baselineValues(5l, "meh")
+ .baselineValues(5l, "asdf")
+ .baselineValues(5l, "12345")
+ .baselineValues(6l, "qwerty")
+ .go();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
new file mode 100644
index 0000000..245e5bb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.unit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import mockit.Delegate;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.DrillTestWrapper;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.parser.ExprLexer;
+import org.apache.drill.common.expression.parser.ExprParser;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.BufferManagerImpl;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.project.Projector;
+import org.apache.drill.exec.physical.impl.project.ProjectorTemplate;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.TypeValidators;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.easy.json.JSONRecordReader;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.test.DrillTest;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Look! Doesn't extend BaseTestQuery!!
+ */
+public class PhysicalOpUnitTestBase extends DrillTest {
+
+ @Injectable FragmentContext fragContext;
+ @Injectable OperatorContext opContext;
+ @Injectable OperatorStats opStats;
+ @Injectable OptionManager optManager;
+ @Injectable PhysicalOperator popConf;
+ @Injectable ExecutionControls executionControls;
+
+ private final DrillConfig drillConf = DrillConfig.create();
+ private final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConf);
+ private final BufferManagerImpl bufManager = new BufferManagerImpl(allocator);
+ private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
+ private final FunctionImplementationRegistry funcReg = new FunctionImplementationRegistry(drillConf, classpathScan);
+ private final TemplateClassDefinition templateClassDefinition = new TemplateClassDefinition<>(Projector.class, ProjectorTemplate.class);
+ private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
+
+ protected LogicalExpression parseExpr(String expr) {
+ ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
+ CommonTokenStream tokens = new CommonTokenStream(lexer);
+ ExprParser parser = new ExprParser(tokens);
+ try {
+ return parser.parse().e;
+ } catch (RecognitionException e) {
+ throw new RuntimeException("Error parsing expression: " + expr);
+ }
+ }
+
+ protected Order.Ordering ordering(String expression, RelFieldCollation.Direction direction, RelFieldCollation.NullDirection nullDirection) {
+ return new Order.Ordering(direction, parseExpr(expression), nullDirection);
+ }
+
+ protected JoinCondition joinCond(String leftExpr, String relationship, String rightExpr) {
+ return new JoinCondition(relationship, parseExpr(leftExpr), parseExpr(rightExpr));
+ }
+
+ protected List<NamedExpression> parseExprs(String... expressionsAndOutputNames) {
+ Preconditions.checkArgument(expressionsAndOutputNames.length %2 ==0, "List of expressions and output field names" +
+ " is not complete, each expression must explicitly give and output name,");
+ List<NamedExpression> ret = new ArrayList<>();
+ for (int i = 0; i < expressionsAndOutputNames.length; i += 2) {
+ ret.add(new NamedExpression(parseExpr(expressionsAndOutputNames[i]),
+ new FieldReference(new SchemaPath(new PathSegment.NameSegment(expressionsAndOutputNames[i+1])))));
+ }
+ return ret;
+ }
+
+
+ void runTest(OperatorTestBuilder testBuilder) {
+ BatchCreator<PhysicalOperator> opCreator;
+ RecordBatch testOperator;
+ try {
+ mockFragmentContext(testBuilder.initReservation, testBuilder.maxAllocation);
+ opCreator = (BatchCreator<PhysicalOperator>)
+ opCreatorReg.getOperatorCreator(testBuilder.popConfig.getClass());
+ List<RecordBatch> incomingStreams = Lists.newArrayList();
+ for (List<String> batchesJson : testBuilder.inputStreamsJSON) {
+ incomingStreams.add(new ScanBatch(null, fragContext,
+ getRecordReadersForJsonBatches(batchesJson, fragContext)));
+ }
+ testOperator = opCreator.getBatch(fragContext, testBuilder.popConfig, incomingStreams);
+
+ Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
+ Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(testBuilder.baselineRecords);
+ DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+
+ } catch (ExecutionSetupException e) {
+ throw new RuntimeException(e);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ } catch (SchemaChangeException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class BatchIterator implements Iterable<VectorAccessible> {
+
+ private RecordBatch operator;
+ public BatchIterator(RecordBatch operator) {
+ this.operator = operator;
+ }
+
+ @Override
+ public Iterator<VectorAccessible> iterator() {
+ return new Iterator<VectorAccessible>() {
+ boolean needToGrabNext = true;
+ RecordBatch.IterOutcome lastResultOutcome;
+ @Override
+ public boolean hasNext() {
+ if (needToGrabNext) {
+ lastResultOutcome = operator.next();
+ needToGrabNext = false;
+ }
+ if (lastResultOutcome == RecordBatch.IterOutcome.NONE
+ || lastResultOutcome == RecordBatch.IterOutcome.STOP) {
+ return false;
+ } else if (lastResultOutcome == RecordBatch.IterOutcome.OUT_OF_MEMORY) {
+ throw new RuntimeException("Operator ran out of memory");
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public VectorAccessible next() {
+ if (needToGrabNext) {
+ lastResultOutcome = operator.next();
+ }
+ needToGrabNext = true;
+ return operator;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not supported.");
+ }
+ };
+ }
+ }
+
+ protected OperatorTestBuilder opTestBuilder() {
+ return new OperatorTestBuilder();
+ }
+
+ protected class OperatorTestBuilder {
+
+ private PhysicalOperator popConfig;
+ private String[] baselineColumns;
+ private List<Map<String, Object>> baselineRecords;
+ private List<List<String>> inputStreamsJSON;
+ private long initReservation = 10000000;
+ private long maxAllocation = 15000000;
+
+ public void go() {
+ runTest(this);
+ }
+
+ public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+ this.popConfig = batch;
+ return this;
+ }
+
+ public OperatorTestBuilder initReservation(long initReservation) {
+ this.initReservation = initReservation;
+ return this;
+ }
+
+ public OperatorTestBuilder maxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ return this;
+ }
+
+ public OperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
+ this.inputStreamsJSON = new ArrayList<>();
+ this.inputStreamsJSON.add(jsonBatches);
+ return this;
+ }
+
+ public OperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
+ this.inputStreamsJSON = childStreams;
+ return this;
+ }
+
+ public OperatorTestBuilder baselineColumns(String... columns) {
+ for (int i = 0; i < columns.length; i++) {
+ LogicalExpression ex = parseExpr(columns[i]);
+ if (ex instanceof SchemaPath) {
+ columns[i] = ((SchemaPath)ex).toExpr();
+ } else {
+ throw new IllegalStateException("Schema path is not a valid format.");
+ }
+ }
+ this.baselineColumns = columns;
+ return this;
+ }
+
+ public OperatorTestBuilder baselineValues(Object ... baselineValues) {
+ if (baselineRecords == null) {
+ baselineRecords = new ArrayList();
+ }
+ Map<String, Object> ret = new HashMap();
+ int i = 0;
+ Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
+ "Must supply the same number of baseline values as columns.");
+ for (String s : baselineColumns) {
+ ret.put(s, baselineValues[i]);
+ i++;
+ }
+ this.baselineRecords.add(ret);
+ return this;
+ }
+ }
+
+ private void mockFragmentContext(long initReservation, long maxAllocation) {
+ final CodeCompiler compiler = new CodeCompiler(drillConf, optManager);
+ final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation);
+ new NonStrictExpectations() {
+ {
+ optManager.getOption(withAny(new TypeValidators.BooleanValidator("", false))); result = false;
+ // TODO(DRILL-4450) - Probably want to just create a default option manager, this is a hack to prevent
+ // the code compilation from failing when trying to decide of scalar replacement is turned on
+ // this will cause other code paths to fail because this return value won't be valid for most
+ // string options
+ optManager.getOption(withAny(new TypeValidators.StringValidator("", "try"))); result = "try";
+ optManager.getOption(withAny(new TypeValidators.PositiveLongValidator("", 1l, 1l))); result = 10;
+ fragContext.getOptions(); result = optManager;
+ fragContext.getManagedBuffer(); result = bufManager.getManagedBuffer();
+ fragContext.shouldContinue(); result = true;
+ fragContext.getExecutionControls(); result = executionControls;
+ fragContext.getFunctionRegistry(); result = funcReg;
+ fragContext.getConfig(); result = drillConf;
+ fragContext.getHandle(); result = ExecProtos.FragmentHandle.getDefaultInstance();
+ try {
+ fragContext.getImplementationClass(withAny(CodeGenerator.get(templateClassDefinition, funcReg)));
+ result = new Delegate()
+ {
+ Object getImplementationClass(CodeGenerator gen) throws IOException, ClassTransformationException {
+ return compiler.getImplementationClass(gen);
+ }
+ };
+ fragContext.getImplementationClass(withAny(CodeGenerator.get(templateClassDefinition, funcReg).getRoot()));
+ result = new Delegate()
+ {
+ Object getImplementationClass(ClassGenerator gen) throws IOException, ClassTransformationException {
+ return compiler.getImplementationClass(gen.getCodeGenerator());
+ }
+ };
+ } catch (ClassTransformationException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ opContext.getStats();result = opStats;
+ opContext.getAllocator(); result = allocator;
+ fragContext.newOperatorContext(withAny(popConf));result = opContext;
+ }
+ };
+ }
+
+ private Iterator<RecordReader> getRecordReadersForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
+ ObjectMapper mapper = new ObjectMapper();
+ List<RecordReader> readers = new ArrayList<>();
+ for (String batchJason : jsonBatches) {
+ JsonNode records;
+ try {
+ records = mapper.readTree(batchJason);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ readers.add(new JSONRecordReader(fragContext, records, null, Collections.singletonList(SchemaPath.getSimplePath("*"))));
+ }
+ return readers.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
index 56e58dc..fd5d4f0 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java
@@ -186,7 +186,7 @@ public class JdbcDataTest extends JdbcTestBase {
Scan scan = findOnlyOperator(plan, Scan.class);
Assert.assertEquals("donuts-json", scan.getStorageEngine());
Project project = findOnlyOperator(plan, Project.class);
- Assert.assertEquals(1, project.getSelections().length);
+ Assert.assertEquals(1, project.getSelections().size());
Assert.assertEquals(Scan.class, project.getInput().getClass());
Store store = findOnlyOperator(plan, Store.class);
Assert.assertEquals("queue", store.getStorageEngine());
@@ -244,9 +244,9 @@ public class JdbcDataTest extends JdbcTestBase {
Assert.assertTrue(filter.getInput() instanceof Scan);
Project[] projects = Iterables.toArray(findOperator(plan, Project.class), Project.class);
Assert.assertEquals(2, projects.length);
- Assert.assertEquals(1, projects[0].getSelections().length);
+ Assert.assertEquals(1, projects[0].getSelections().size());
Assert.assertEquals(Filter.class, projects[0].getInput().getClass());
- Assert.assertEquals(2, projects[1].getSelections().length);
+ Assert.assertEquals(2, projects[1].getSelections().size());
Assert.assertEquals(Project.class, projects[1].getInput().getClass());
Store store = findOnlyOperator(plan, Store.class);
Assert.assertEquals("queue", store.getStorageEngine());