You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/08/27 08:25:37 UTC
[drill] 06/06: DRILL-6461: Added basic data correctness tests for
hash agg, and improved operator unit testing framework.
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit d8f9fb6a5cf22a01fa3f48bd40e7dbeb3cb6e4e4
Author: Timothy Farkas <ti...@apache.org>
AuthorDate: Mon Jun 4 10:45:12 2018 -0700
DRILL-6461: Added basic data correctness tests for hash agg, and improved operator unit testing framework.
git closes #1344
---
.../drill/exec/physical/impl/TopN/TopNBatch.java | 12 -
.../physical/impl/common/HashTableTemplate.java | 12 +-
.../physical/impl/project/ProjectRecordBatch.java | 2 +
.../physical/impl/svremover/AbstractCopier.java | 26 +-
.../physical/impl/svremover/AbstractSV2Copier.java | 4 +-
.../physical/impl/svremover/AbstractSV4Copier.java | 4 +-
.../drill/exec/physical/impl/svremover/Copier.java | 4 +-
.../physical/impl/svremover/GenericCopier.java | 7 +-
.../physical/impl/svremover/StraightCopier.java | 3 +-
.../apache/drill/exec/record/RecordBatchSizer.java | 17 ++
.../physical/impl/BaseTestOpBatchEmitOutcome.java | 2 +-
.../drill/exec/physical/impl/MockRecordBatch.java | 196 ++++++++++---
.../physical/impl/agg/TestAggWithAnyValue.java | 6 +-
.../exec/physical/impl/agg/TestHashAggBatch.java | 212 ++++++++++++++
.../physical/impl/common/HashPartitionTest.java | 70 +++--
.../exec/physical/impl/join/TestHashJoinJPPD.java | 6 +-
.../exec/physical/impl/join/TestHashJoinSpill.java | 8 +-
.../physical/impl/limit/TestLimitOperator.java | 16 +-
.../impl/svremover/AbstractGenericCopierTest.java | 90 +++---
.../physical/impl/svremover/GenericCopierTest.java | 4 +-
.../impl/svremover/GenericSV2BatchCopierTest.java | 4 +-
.../impl/svremover/GenericSV2CopierTest.java | 4 +-
.../impl/svremover/GenericSV4CopierTest.java | 9 +-
.../physical/unit/BasicPhysicalOpUnitTest.java | 30 +-
.../exec/physical/unit/MiniPlanUnitTestBase.java | 3 +-
.../exec/physical/unit/TestOutputBatchSize.java | 80 +++---
.../columnreaders/TestBatchSizingMemoryUtil.java | 2 +-
.../drill/test/LegacyOperatorTestBuilder.java | 178 ++++++++++++
.../org/apache/drill/test/OperatorTestBuilder.java | 314 +++++++++++++++++++++
.../apache/drill/test/OperatorTestBuilderTest.java | 157 +++++++++++
.../unit => test}/PhysicalOpUnitTestBase.java | 169 ++---------
.../org/apache/drill/test/rowSet/RowSetBatch.java | 111 --------
.../apache/drill/test/rowSet/RowSetComparison.java | 151 +++++++---
.../drill/test/rowSet/TestRowSetComparison.java | 211 ++++++++++++++
.../main/codegen/templates/FixedValueVectors.java | 2 +-
.../codegen/templates/NullableValueVectors.java | 2 +-
36 files changed, 1616 insertions(+), 512 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 2763f59..2e343b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -137,23 +137,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
@Override
public void buildSchema() throws SchemaChangeException {
- VectorContainer c = new VectorContainer(oContext);
IterOutcome outcome = next(incoming);
switch (outcome) {
case OK:
case OK_NEW_SCHEMA:
for (VectorWrapper<?> w : incoming) {
- // TODO: Not sure why the special handling for AbstractContainerVector is needed since creation of child
- // vectors is taken care correctly if the field is retrieved from incoming vector and passed to it rather than
- // creating a new Field instance just based on name and type.
- @SuppressWarnings("resource")
- ValueVector v = c.addOrGet(w.getField());
- if (v instanceof AbstractContainerVector) {
- w.getValueVector().makeTransferPair(v);
- v.clear();
- }
- }
- for (VectorWrapper<?> w : c) {
@SuppressWarnings("resource")
ValueVector v = container.addOrGet(w.getField());
if (v instanceof AbstractContainerVector) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 3c418b9..958a0b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -545,6 +545,15 @@ public abstract class HashTableTemplate implements HashTable {
@Override
public void clear() {
+ clear(true);
+ }
+
+ private void clear(boolean close) {
+ if (close) {
+ // If we are closing, we need to clear the htContainerOrig as well.
+ htContainerOrig.clear();
+ }
+
if (batchHolders != null) {
for (BatchHolder bh : batchHolders) {
bh.clear();
@@ -842,7 +851,7 @@ public abstract class HashTableTemplate implements HashTable {
*
*/
public void reset() {
- this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
+ this.clear(false); // Clear all current batch holders and hash table (i.e. free their memory)
freeIndex = 0; // all batch holders are gone
// reallocate batch holders, and the hash table to the original size
@@ -852,6 +861,7 @@ public abstract class HashTableTemplate implements HashTable {
totalIndexSize = 0;
startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
}
+
public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) {
incomingBuild = newIncoming;
incomingProbe = newIncomingProbe;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index dd93325..b459e1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -329,6 +329,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
m.setValueCount(count);
}
+ container.setRecordCount(count);
+
if (complexWriters == null) {
return;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
index 47ec1cb..a463519 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.svremover;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;
@@ -30,7 +30,7 @@ public abstract class AbstractCopier implements Copier {
protected VectorContainer outgoing;
@Override
- public void setup(RecordBatch incoming, VectorContainer outgoing) {
+ public void setup(VectorAccessible incoming, VectorContainer outgoing) {
this.outgoing = outgoing;
final int count = outgoing.getNumberOfColumns();
@@ -43,15 +43,7 @@ public abstract class AbstractCopier implements Copier {
@Override
public int copyRecords(int index, int recordCount) {
- for(VectorWrapper<?> out : outgoing){
- TypeProtos.MajorType type = out.getField().getType();
- if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
- out.getValueVector().allocateNew();
- } else {
- AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
- }
- }
-
+ allocateOutgoing(outgoing, recordCount);
return insertRecords(0, index, recordCount);
}
@@ -91,4 +83,16 @@ public abstract class AbstractCopier implements Copier {
public abstract void copyEntryIndirect(int inIndex, int outIndex);
public abstract void copyEntry(int inIndex, int outIndex);
+
+ public static void allocateOutgoing(VectorContainer outgoing, int recordCount) {
+ for(VectorWrapper<?> out : outgoing) {
+ TypeProtos.MajorType type = out.getField().getType();
+
+ if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+ out.getValueVector().allocateNew();
+ } else {
+ AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index 68a0889..d273fd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.svremover;
-import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -33,7 +33,7 @@ public abstract class AbstractSV2Copier extends AbstractCopier {
protected List<TransferPair> transferPairs = new ArrayList<>();
@Override
- public void setup(RecordBatch incoming, VectorContainer outgoing) {
+ public void setup(VectorAccessible incoming, VectorContainer outgoing) {
super.setup(incoming, outgoing);
this.sv2 = incoming.getSelectionVector2();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index 56e2586..970f970 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.svremover;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -30,7 +30,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier {
private SelectionVector4 sv4;
@Override
- public void setup(RecordBatch incoming, VectorContainer outgoing) {
+ public void setup(VectorAccessible incoming, VectorContainer outgoing) {
super.setup(incoming, outgoing);
this.sv4 = incoming.getSelectionVector4();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 92dea70..f8934e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -17,11 +17,11 @@
*/
package org.apache.drill.exec.physical.impl.svremover;
-import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorAccessible;
public interface Copier {
- void setup(RecordBatch incoming, VectorContainer outgoing);
+ void setup(VectorAccessible incoming, VectorContainer outgoing);
int copyRecords(int index, int recordCount);
int appendRecord(int index);
int appendRecords(int index, int recordCount);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
index 72516e0..f64a11e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -17,11 +17,13 @@
*/
package org.apache.drill.exec.physical.impl.svremover;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
+import static org.apache.drill.exec.physical.impl.svremover.AbstractCopier.allocateOutgoing;
+
public class GenericCopier implements Copier {
private ValueVector[] vvOut;
private ValueVector[] vvIn;
@@ -29,7 +31,7 @@ public class GenericCopier implements Copier {
private VectorContainer outgoing;
@Override
- public void setup(RecordBatch incoming, VectorContainer outgoing) {
+ public void setup(VectorAccessible incoming, VectorContainer outgoing) {
this.outgoing = outgoing;
final int count = outgoing.getNumberOfColumns();
@@ -53,6 +55,7 @@ public class GenericCopier implements Copier {
@Override
public int copyRecords(int index, int recordCount) {
+ allocateOutgoing(outgoing, recordCount);
return insertRecords(0, index, recordCount);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
index 33f2a96..cecfc5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.svremover;
import com.google.common.collect.Lists;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -39,7 +40,7 @@ public class StraightCopier implements Copier {
}
@Override
- public void setup(RecordBatch incoming, VectorContainer outgoing) {
+ public void setup(VectorAccessible incoming, VectorContainer outgoing) {
for(VectorWrapper<?> vv : incoming){
TransferPair tp = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
pairs.add(tp);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 83287ee..dac80a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.record;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.Map;
@@ -634,6 +636,11 @@ public class RecordBatchSizer {
private Map<String, ColumnSize> columnSizes = CaseInsensitiveMap.newHashMap();
/**
+ * This field is used by the convenience method {@link #columnsList()}.
+ */
+ private List<ColumnSize> columnSizesList = new ArrayList<>();
+
+ /**
* Number of records (rows) in the batch.
*/
private int rowCount;
@@ -715,6 +722,8 @@ public class RecordBatchSizer {
for (VectorWrapper<?> vw : va) {
ColumnSize colSize = measureColumn(vw.getValueVector(), "");
columnSizes.put(vw.getField().getName(), colSize);
+ columnSizesList.add(colSize);
+ stdRowWidth += colSize.getStdDataSizePerEntry();
netBatchSize += colSize.getTotalNetSize();
maxSize = Math.max(maxSize, colSize.getTotalDataSize());
if (colSize.metadata.isNullable()) {
@@ -885,6 +894,14 @@ public class RecordBatchSizer {
public Map<String, ColumnSize> columns() { return columnSizes; }
/**
+ * This is a convenience method to get the sizes of columns in the same order that the corresponding value vectors
+ * are stored within a {@link org.apache.drill.exec.record.VectorAccessible}.
+ * @return The sizes of columns in the same order that the corresponding value vectors are stored within a
+ * {@link org.apache.drill.exec.record.VectorAccessible}.
+ */
+ public List<ColumnSize> columnsList() { return columnSizesList; }
+
+ /**
* Compute the "real" width of the row, taking into account each varchar column size
* (historically capped at 50, and rounded up to power of 2 to match drill buf allocation)
* and null marking columns.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index 4eaca2b..620a61c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index ed7af4c..f3ec7b0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -29,9 +30,17 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
public class MockRecordBatch implements CloseableRecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordBatch.class);
@@ -39,41 +48,83 @@ public class MockRecordBatch implements CloseableRecordBatch {
// These resources are owned by this RecordBatch
protected VectorContainer container;
protected SelectionVector2 sv2;
+ protected SelectionVector4 sv4;
private int currentContainerIndex;
private int currentOutcomeIndex;
private boolean isDone;
private boolean limitWithUnnest;
// All the below resources are owned by caller
- private final List<VectorContainer> allTestContainers;
- private List<SelectionVector2> allTestContainersSv2;
+ private final List<RowSet> rowSets;
private final List<IterOutcome> allOutcomes;
private final FragmentContext context;
protected final OperatorContext oContext;
protected final BufferAllocator allocator;
- public MockRecordBatch(FragmentContext context, OperatorContext oContext,
- List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
- BatchSchema schema) {
+ private MockRecordBatch(@NotNull final FragmentContext context,
+ @Nullable final OperatorContext oContext,
+ @NotNull final List<RowSet> testRowSets,
+ @NotNull final List<IterOutcome> iterOutcomes,
+ @NotNull final BatchSchema schema,
+ final boolean dummy) {
+ Preconditions.checkNotNull(testRowSets);
+ Preconditions.checkNotNull(iterOutcomes);
+ Preconditions.checkNotNull(schema);
+
this.context = context;
this.oContext = oContext;
- this.allocator = oContext.getAllocator();
- this.allTestContainers = testContainers;
+ this.rowSets = testRowSets;
+ this.allocator = context.getAllocator();
this.container = new VectorContainer(allocator, schema);
this.allOutcomes = iterOutcomes;
this.currentContainerIndex = 0;
this.currentOutcomeIndex = 0;
this.isDone = false;
- this.allTestContainersSv2 = null;
- this.sv2 = null;
}
- public MockRecordBatch(FragmentContext context, OperatorContext oContext,
- List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
- List<SelectionVector2> testContainersSv2, BatchSchema schema) {
- this(context, oContext, testContainers, iterOutcomes, schema);
- allTestContainersSv2 = testContainersSv2;
- sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? new SelectionVector2(allocator) : null;
+ @Deprecated
+ public MockRecordBatch(@Nullable final FragmentContext context,
+ @Nullable final OperatorContext oContext,
+ @NotNull final List<VectorContainer> testContainers,
+ @NotNull final List<IterOutcome> iterOutcomes,
+ final BatchSchema schema) {
+ this(context,
+ oContext,
+ testContainers.stream().
+ map(container -> DirectRowSet.fromContainer(container)).
+ collect(Collectors.toList()),
+ iterOutcomes,
+ schema,
+ true);
+ }
+
+ @Deprecated
+ public MockRecordBatch(@Nullable final FragmentContext context,
+ @Nullable final OperatorContext oContext,
+ @NotNull final List<VectorContainer> testContainers,
+ @NotNull final List<IterOutcome> iterOutcomes,
+ @NotNull final List<SelectionVector2> selectionVector2s,
+ final BatchSchema schema) {
+ this(context,
+ oContext,
+ new Supplier<List<RowSet>>() {
+ @Override
+ public List<RowSet> get() {
+ List<RowSet> rowSets = new ArrayList<>();
+
+ for (int index = 0; index < testContainers.size(); index++) {
+ if (index >= selectionVector2s.size()) {
+ rowSets.add(IndirectRowSet.fromContainer(testContainers.get(index)));
+ } else {
+ rowSets.add(IndirectRowSet.fromSv2(testContainers.get(index), selectionVector2s.get(index)));
+ }
+ }
+ return rowSets;
+ }
+ }.get(),
+ iterOutcomes,
+ schema,
+ true);
}
@Override
@@ -94,7 +145,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
@Override
public SelectionVector4 getSelectionVector4() {
- return null;
+ return sv4;
}
@Override
@@ -146,10 +197,11 @@ public class MockRecordBatch implements CloseableRecordBatch {
return IterOutcome.NONE;
}
- IterOutcome currentOutcome = IterOutcome.OK;
+ IterOutcome currentOutcome;
- if (currentContainerIndex < allTestContainers.size()) {
- final VectorContainer input = allTestContainers.get(currentContainerIndex);
+ if (currentContainerIndex < rowSets.size()) {
+ final RowSet rowSet = rowSets.get(currentContainerIndex);
+ final VectorContainer input = rowSet.container();
final int recordCount = input.getRecordCount();
// We need to do this since the downstream operator expects vector reference to be same
// after first next call in cases when schema is not changed
@@ -158,19 +210,34 @@ public class MockRecordBatch implements CloseableRecordBatch {
container.clear();
container = new VectorContainer(allocator, inputSchema);
}
- container.transferIn(input);
- container.setRecordCount(recordCount);
-
- // Transfer the sv2 as well
- final SelectionVector2 inputSv2 =
- (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
- ? allTestContainersSv2.get(currentContainerIndex) : null;
- if (inputSv2 != null) {
- sv2.allocateNewSafe(inputSv2.getCount());
- for (int i=0; i<inputSv2.getCount(); ++i) {
- sv2.setIndex(i, inputSv2.getIndex(i));
- }
- sv2.setRecordCount(inputSv2.getCount());
+
+ switch (rowSet.indirectionType()) {
+ case NONE:
+ case TWO_BYTE:
+ container.transferIn(input);
+ container.setRecordCount(recordCount);
+ final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
+
+ if (sv2 != null) {
+ // Operators assume that new values for an Sv2 are transferred in.
+ sv2.allocateNewSafe(inputSv2.getCount());
+ for (int i=0; i<inputSv2.getCount(); ++i) {
+ sv2.setIndex(i, inputSv2.getIndex(i));
+ }
+ sv2.setRecordCount(inputSv2.getCount());
+ } else {
+ sv2 = inputSv2;
+ }
+
+ break;
+ case FOUR_BYTE:
+ // TODO find a clean way to transfer in for this case.
+ container.clear();
+ container = input;
+ sv4 = ((RowSet.HyperRowSet) rowSet).getSv4();
+ break;
+ default:
+ throw new UnsupportedOperationException();
}
}
@@ -222,4 +289,69 @@ public class MockRecordBatch implements CloseableRecordBatch {
public void useUnnestKillHandlingForLimit(boolean limitWithUnnest) {
this.limitWithUnnest = limitWithUnnest;
}
+
+ public static class Builder {
+ private final List<RowSet> rowSets = new ArrayList<>();
+ private final List<IterOutcome> iterOutcomes = new ArrayList<>();
+
+ private BatchSchema batchSchema;
+ private OperatorContext oContext;
+
+ public Builder() {
+ }
+
+ private Builder sendData(final RowSet rowSet, final IterOutcome outcome) {
+ Preconditions.checkState(batchSchema == null);
+ rowSets.add(rowSet);
+ iterOutcomes.add(outcome);
+ return this;
+ }
+
+ public Builder sendData(final RowSet rowSet) {
+ final IterOutcome outcome = rowSets.isEmpty()? IterOutcome.OK_NEW_SCHEMA: IterOutcome.OK;
+ return sendData(rowSet, outcome);
+ }
+
+ public Builder sendDataWithNewSchema(final RowSet rowSet) {
+ return sendData(rowSet, IterOutcome.OK_NEW_SCHEMA);
+ }
+
+ public Builder sendDataAndEmit(final RowSet rowSet) {
+ return sendData(rowSet, IterOutcome.EMIT);
+ }
+
+ public Builder terminateWithError(IterOutcome errorOutcome) {
+ Preconditions.checkArgument(errorOutcome != IterOutcome.STOP);
+ Preconditions.checkArgument(errorOutcome != IterOutcome.OUT_OF_MEMORY);
+
+ iterOutcomes.add(errorOutcome);
+ return this;
+ }
+
+ public Builder setSchema(final BatchSchema batchSchema) {
+ Preconditions.checkState(!rowSets.isEmpty());
+ this.batchSchema = Preconditions.checkNotNull(batchSchema);
+ return this;
+ }
+
+ public Builder withOperatorContext(final OperatorContext oContext) {
+ this.oContext = Preconditions.checkNotNull(oContext);
+ return this;
+ }
+
+ public MockRecordBatch build(final FragmentContext context) {
+ BatchSchema tempSchema = batchSchema;
+
+ if (tempSchema == null && !rowSets.isEmpty()) {
+ tempSchema = rowSets.get(0).batchSchema();
+ }
+
+ return new MockRecordBatch(context,
+ oContext,
+ rowSets,
+ iterOutcomes,
+ tempSchema,
+ true);
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
index 37c0b52..9909bca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.agg;
import com.google.common.collect.Lists;
import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.OperatorTest;
@@ -59,7 +59,7 @@ public class TestAggWithAnyValue {
"{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.13987654321\"," +
" \"a\": [{\"b\":70, \"c\":85}, {\"b\":90, \"c\":145}], \"m\": [{\"n\": [7, 8, 9]}], \"f\": [{\"g\": {\"h\": [{\"k\": 50}, {\"k\": 60}]}}]," +
"\"p\": {\"q\": [33, 34, 35]}" + "}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(aggConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("age", "any_a")
@@ -146,4 +146,4 @@ public class TestAggWithAnyValue {
.go();
}
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java
new file mode 100644
index 0000000..2c6976c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggBatch.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.agg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.ExecConstants.HASHAGG_NUM_PARTITIONS_KEY;
+
+public class TestHashAggBatch extends PhysicalOpUnitTestBase {
+ public static final String FIRST_NAME_COL = "firstname";
+ public static final String LAST_NAME_COL = "lastname";
+ public static final String STUFF_COL = "stuff";
+ public static final String TOTAL_STUFF_COL = "totalstuff";
+
+ public static final List<String> FIRST_NAMES = ImmutableList.of(
+ "Strawberry",
+ "Banana",
+ "Mango",
+ "Grape");
+
+ public static final List<String> LAST_NAMES = ImmutableList.of(
+ "Red",
+ "Green",
+ "Blue",
+ "Purple");
+
+ public static final TupleMetadata INT_OUTPUT_SCHEMA = new SchemaBuilder()
+ .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .add(TOTAL_STUFF_COL, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
+
+ // TODO remove this in order to test multiple partitions
+ @Before
+ public void setupSimpleSingleBatchSumTestPhase1of2() {
+ operatorFixture.getOptionManager().setLocalOption(HASHAGG_NUM_PARTITIONS_KEY, 1);
+ }
+
+ @Test
+ public void simpleSingleBatchSumTestPhase1of2() throws Exception {
+ batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_1of2);
+ }
+
+ @Test
+ public void simpleMultiBatchSumTestPhase1of2() throws Exception {
+ batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_1of2);
+ }
+
+ @Test
+ public void simpleSingleBatchSumTestPhase1of1() throws Exception {
+ batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_1of1);
+ }
+
+ @Test
+ public void simpleMultiBatchSumTestPhase1of1() throws Exception {
+ batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_1of1);
+ }
+
+ @Test
+ public void simpleSingleBatchSumTestPhase2of2() throws Exception {
+ batchSumTest(100, Integer.MAX_VALUE, AggPrelBase.OperatorPhase.PHASE_2of2);
+ }
+
+ @Test
+ public void simpleMultiBatchSumTestPhase2of2() throws Exception {
+ batchSumTest(100, 100, AggPrelBase.OperatorPhase.PHASE_2of2);
+ }
+
+ private void batchSumTest(int totalCount, int maxInputBatchSize, AggPrelBase.OperatorPhase phase) throws Exception {
+ final HashAggregate hashAggregate = createHashAggPhysicalOperator(phase);
+ final List<RowSet> inputRowSets = buildInputRowSets(TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED,
+ totalCount, maxInputBatchSize);
+
+ final MockRecordBatch.Builder rowSetBatchBuilder = new MockRecordBatch.Builder();
+ inputRowSets.forEach(rowSet -> rowSetBatchBuilder.sendData(rowSet));
+ final MockRecordBatch inputRowSetBatch = rowSetBatchBuilder.build(fragContext);
+
+ final RowSet expectedRowSet = buildIntExpectedRowSet(totalCount);
+
+ opTestBuilder()
+ .physicalOperator(hashAggregate)
+ .combineOutputBatches()
+ .unordered()
+ .addUpstreamBatch(inputRowSetBatch)
+ .addExpectedResult(expectedRowSet)
+ .go();
+ }
+
+ private HashAggregate createHashAggPhysicalOperator(AggPrelBase.OperatorPhase phase) {
+ final List<NamedExpression> keyExpressions = Lists.newArrayList(
+ new NamedExpression(SchemaPath.getSimplePath(FIRST_NAME_COL), new FieldReference(FIRST_NAME_COL)),
+ new NamedExpression(SchemaPath.getSimplePath(LAST_NAME_COL), new FieldReference(LAST_NAME_COL)));
+
+ final List<NamedExpression> aggExpressions = Lists.newArrayList(
+ new NamedExpression(
+ new FunctionCall("sum", ImmutableList.of(SchemaPath.getSimplePath(STUFF_COL)),
+ new ExpressionPosition(null, 0)),
+ new FieldReference(TOTAL_STUFF_COL)));
+
+ return new HashAggregate(
+ null,
+ phase,
+ keyExpressions,
+ aggExpressions,
+ 0.0f);
+ }
+
+ private TupleMetadata buildInputSchema(TypeProtos.MinorType minorType, TypeProtos.DataMode dataMode) {
+ return new SchemaBuilder()
+ .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .add(STUFF_COL, minorType, dataMode)
+ .buildSchema();
+ }
+
+ private List<RowSet> buildInputRowSets(final TypeProtos.MinorType minorType,
+ final TypeProtos.DataMode dataMode,
+ final int dataCount,
+ final int maxBatchSize) {
+ Preconditions.checkArgument(dataCount > 0);
+ Preconditions.checkArgument(maxBatchSize > 0);
+
+ List<RowSet> inputRowSets = new ArrayList<>();
+ int currentBatchSize = 0;
+ RowSetBuilder inputRowSetBuilder = null;
+
+ for (int multiplier = 1, firstNameIndex = 0; firstNameIndex < FIRST_NAMES.size(); firstNameIndex++) {
+ final String firstName = FIRST_NAMES.get(firstNameIndex);
+
+ for (int lastNameIndex = 0; lastNameIndex < LAST_NAMES.size(); lastNameIndex++, multiplier++) {
+ final String lastName = LAST_NAMES.get(lastNameIndex);
+
+ for (int index = 1; index <= dataCount; index++) {
+ final int num = index * multiplier;
+
+ if (currentBatchSize == 0) {
+ final TupleMetadata inputSchema = buildInputSchema(minorType, dataMode);
+ inputRowSetBuilder = new RowSetBuilder(operatorFixture.allocator(), inputSchema);
+ }
+
+ inputRowSetBuilder.addRow(firstName, lastName, num);
+ currentBatchSize++;
+
+ if (currentBatchSize == maxBatchSize) {
+ final RowSet rowSet = inputRowSetBuilder.build();
+ inputRowSets.add(rowSet);
+ currentBatchSize = 0;
+ }
+ }
+ }
+ }
+
+ if (currentBatchSize != 0) {
+ inputRowSets.add(inputRowSetBuilder.build());
+ }
+
+ return inputRowSets;
+ }
+
+ private RowSet buildIntExpectedRowSet(final int dataCount) {
+ final RowSetBuilder expectedRowSetBuilder = new RowSetBuilder(operatorFixture.allocator(), INT_OUTPUT_SCHEMA);
+
+ for (int multiplier = 1, firstNameIndex = 0; firstNameIndex < FIRST_NAMES.size(); firstNameIndex++) {
+ final String firstName = FIRST_NAMES.get(firstNameIndex);
+
+ for (int lastNameIndex = 0; lastNameIndex < LAST_NAMES.size(); lastNameIndex++, multiplier++) {
+ final String lastName = LAST_NAMES.get(lastNameIndex);
+ final long total = ((dataCount * (dataCount + 1)) / 2) * multiplier;
+
+ expectedRowSetBuilder.addRow(firstName, lastName, total);
+ }
+ }
+
+ return expectedRowSetBuilder.build();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index 48fd856..bbe57fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl;
@@ -41,13 +42,13 @@ import org.apache.drill.exec.planner.logical.DrillJoinRel;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.DirectRowSet;
import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetBatch;
import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Assert;
@@ -68,27 +69,31 @@ public class HashPartitionTest {
private RowSet probeRowSet;
@Override
- public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
- buildRowSet = new RowSetBuilder(allocator, schema)
+ public CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context) {
+ buildRowSet = new RowSetBuilder(context.getAllocator(), schema)
.addRow(1, "green")
.addRow(3, "red")
.addRow(2, "blue")
.build();
- return new RowSetBatch(buildRowSet);
+ return new MockRecordBatch.Builder().
+ sendData(buildRowSet).
+ build(context);
}
@Override
- public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+ public void createResultBuildBatch(BatchSchema schema, FragmentContext context) {
}
@Override
- public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
- probeRowSet = new RowSetBuilder(allocator, schema)
+ public CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context) {
+ probeRowSet = new RowSetBuilder(context.getAllocator(), schema)
.addRow(.5, "yellow")
.addRow(1.5, "blue")
.addRow(2.5, "black")
.build();
- return new RowSetBatch(probeRowSet);
+ return new MockRecordBatch.Builder().
+ sendData(probeRowSet).
+ build(context);
}
@Override
@@ -114,9 +119,9 @@ public class HashPartitionTest {
final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
- hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
- hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
- hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+ hashPartition.appendInnerRow(buildBatch.getContainer(), 0, 10, noopCalc);
+ hashPartition.appendInnerRow(buildBatch.getContainer(), 1, 11, noopCalc);
+ hashPartition.appendInnerRow(buildBatch.getContainer(), 2, 12, noopCalc);
hashPartition.completeAnInnerBatch(false, false);
hashPartition.buildContainersHashTableAndHelper();
@@ -155,22 +160,24 @@ public class HashPartitionTest {
private RowSet actualBuildRowSet;
@Override
- public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
- buildRowSet = new RowSetBuilder(allocator, schema)
+ public CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context) {
+ buildRowSet = new RowSetBuilder(context.getAllocator(), schema)
.addRow(1, "green")
.addRow(3, "red")
.addRow(2, "blue")
.build();
- return new RowSetBatch(buildRowSet);
+ return new MockRecordBatch.Builder().
+ sendData(buildRowSet).
+ build(context);
}
@Override
- public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+ public void createResultBuildBatch(BatchSchema schema, FragmentContext context) {
final BatchSchema newSchema = BatchSchema.newBuilder()
.addFields(schema)
.addField(MaterializedField.create(HashPartition.HASH_VALUE_COLUMN_NAME, HashPartition.HVtype))
.build();
- actualBuildRowSet = new RowSetBuilder(allocator, newSchema)
+ actualBuildRowSet = new RowSetBuilder(context.getAllocator(), newSchema)
.addRow(1, "green", 10)
.addRow(3, "red", 11)
.addRow(2, "blue", 12)
@@ -178,13 +185,15 @@ public class HashPartitionTest {
}
@Override
- public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
- probeRowSet = new RowSetBuilder(allocator, schema)
+ public CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context) {
+ probeRowSet = new RowSetBuilder(context.getAllocator(), schema)
.addRow(.5, "yellow")
.addRow(1.5, "blue")
.addRow(2.5, "black")
.build();
- return new RowSetBatch(probeRowSet);
+ return new MockRecordBatch.Builder().
+ sendData(probeRowSet).
+ build(context);
}
@Override
@@ -210,9 +219,9 @@ public class HashPartitionTest {
final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
- hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
- hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
- hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+ hashPartition.appendInnerRow(buildBatch.getContainer(), 0, 10, noopCalc);
+ hashPartition.appendInnerRow(buildBatch.getContainer(), 1, 11, noopCalc);
+ hashPartition.appendInnerRow(buildBatch.getContainer(), 2, 12, noopCalc);
hashPartition.completeAnInnerBatch(false, false);
hashPartition.spillThisPartition();
final String spillFile = hashPartition.getSpillFile();
@@ -260,15 +269,17 @@ public class HashPartitionTest {
MaterializedField buildColB = MaterializedField.create("buildColB", Types.required(TypeProtos.MinorType.VARCHAR));
List<MaterializedField> buildCols = Lists.newArrayList(buildColA, buildColB);
final BatchSchema buildSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, buildCols);
- final RecordBatch buildBatch = testCase.createBuildBatch(buildSchema, allocator);
- testCase.createResultBuildBatch(buildSchema, allocator);
+ final CloseableRecordBatch buildBatch = testCase.createBuildBatch(buildSchema, operatorContext.getFragmentContext());
+ buildBatch.next();
+ testCase.createResultBuildBatch(buildSchema, operatorContext.getFragmentContext());
// Create probe batch
MaterializedField probeColA = MaterializedField.create("probeColA", Types.required(TypeProtos.MinorType.FLOAT4));
MaterializedField probeColB = MaterializedField.create("probeColB", Types.required(TypeProtos.MinorType.VARCHAR));
List<MaterializedField> probeCols = Lists.newArrayList(probeColA, probeColB);
final BatchSchema probeSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, probeCols);
- final RecordBatch probeBatch = testCase.createProbeBatch(probeSchema, allocator);
+ final CloseableRecordBatch probeBatch = testCase.createProbeBatch(probeSchema, operatorContext.getFragmentContext());
+ probeBatch.next();
final LogicalExpression buildColExpression = SchemaPath.getSimplePath(buildColB.getName());
final LogicalExpression probeColExpression = SchemaPath.getSimplePath(probeColB.getName());
@@ -285,14 +296,17 @@ public class HashPartitionTest {
baseHashTable.updateIncoming(buildBatch, probeBatch);
testCase.run(spillSet, buildSchema, probeSchema, buildBatch, probeBatch, baseHashTable, context, operatorContext);
+
+ buildBatch.close();
+ probeBatch.close();
}
}
}
interface HashPartitionTestCase {
- RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator);
- void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator);
- RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator);
+ CloseableRecordBatch createBuildBatch(BatchSchema schema, FragmentContext context);
+ void createResultBuildBatch(BatchSchema schema, FragmentContext context);
+ CloseableRecordBatch createProbeBatch(BatchSchema schema, FragmentContext context);
void run(SpillSet spillSet,
BatchSchema buildSchema,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
index aae566b..d3d1487 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
@@ -22,10 +22,10 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -58,7 +58,7 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
for ( int cnt = 1; cnt <= numRows; cnt++ ) {
leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
}
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
.baselineColumns("lft", "a", "b", "rgt")
@@ -91,7 +91,7 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
for ( int cnt = 1; cnt <= numRows; cnt++ ) {
leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
}
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
.baselineColumns("lft", "a", "b", "rgt")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
index 0c08611..67d214f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -23,7 +23,7 @@ import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -52,7 +52,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
}
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
.baselineColumns("lft", "a", "b", "rgt")
@@ -79,7 +79,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
}
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
.baselineColumns("lft", "a", "b", "rgt")
@@ -110,7 +110,7 @@ public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
// rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
}
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
.baselineColumns("lft", "a", "b", "rgt")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
index 7225edc..3dbd1cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.limit;
import com.google.common.collect.Lists;
import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
@@ -55,7 +55,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(limitConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -71,7 +71,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(limitConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -85,7 +85,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(limitConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -99,7 +99,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(limitConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -114,7 +114,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(limitConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -130,7 +130,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(limitConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -144,7 +144,7 @@ public class TestLimitOperator extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(limitConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
index 7d444b4..292af20 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
@@ -20,77 +20,99 @@ package org.apache.drill.exec.physical.impl.svremover;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.DirectRowSet;
import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetBatch;
import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Rule;
import org.junit.Test;
public abstract class AbstractGenericCopierTest {
+ @Rule
+ public final BaseDirTestWatcher baseDirTestWatcher = new BaseDirTestWatcher();
+
@Test
- public void testCopyRecords() throws SchemaChangeException {
- try (RootAllocator allocator = new RootAllocator(10_000_000)) {
- final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+ public void testCopyRecords() throws Exception {
+ try (OperatorFixture operatorFixture = new OperatorFixture.Builder(baseDirTestWatcher).build()) {
+ final BufferAllocator allocator = operatorFixture.allocator();
+ final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
final RowSet srcRowSet = createSrcRowSet(allocator);
- final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
- final VectorContainer destContainer = destRowSet.container();
- final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
+ final VectorContainer destContainer = new VectorContainer(allocator, batchSchema);
+
+ destContainer.setRecordCount(0);
final RowSet expectedRowSet = createExpectedRowset(allocator);
- copier.copyRecords(0, 3);
+ MockRecordBatch mockRecordBatch = null;
try {
- new RowSetComparison(expectedRowSet).verify(destRowSet);
+ mockRecordBatch = new MockRecordBatch.Builder().
+ sendData(srcRowSet).
+ build(operatorFixture.getFragmentContext());
+ mockRecordBatch.next();
+ final Copier copier = createCopier(mockRecordBatch, destContainer, null);
+ copier.copyRecords(0, 3);
+
+ new RowSetComparison(expectedRowSet).verify(DirectRowSet.fromContainer(destContainer));
} finally {
- srcRowSet.clear();
-
- if (srcRowSet instanceof RowSet.HyperRowSet) {
- ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+ if (mockRecordBatch != null) {
+ mockRecordBatch.close();
}
- destRowSet.clear();
+ srcRowSet.clear();
+ destContainer.clear();
expectedRowSet.clear();
}
}
}
@Test
- public void testAppendRecords() throws SchemaChangeException {
- try (RootAllocator allocator = new RootAllocator(10_000_000)) {
- final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+ public void testAppendRecords() throws Exception {
+ try (OperatorFixture operatorFixture = new OperatorFixture.Builder(baseDirTestWatcher).build()) {
+ final BufferAllocator allocator = operatorFixture.allocator();
+ final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
final RowSet srcRowSet = createSrcRowSet(allocator);
- final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
- final VectorContainer destContainer = destRowSet.container();
- final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
+ final VectorContainer destContainer = new VectorContainer(allocator, batchSchema);
+
+ AbstractCopier.allocateOutgoing(destContainer, 3);
+
+ destContainer.setRecordCount(0);
final RowSet expectedRowSet = createExpectedRowset(allocator);
- copier.appendRecord(0);
- copier.appendRecords(1, 2);
+ MockRecordBatch mockRecordBatch = null;
try {
- new RowSetComparison(expectedRowSet).verify(destRowSet);
+ mockRecordBatch = new MockRecordBatch.Builder().
+ sendData(srcRowSet).
+ build(operatorFixture.getFragmentContext());
+ mockRecordBatch.next();
+ final Copier copier = createCopier(mockRecordBatch, destContainer, null);
+ copier.appendRecord(0);
+ copier.appendRecords(1, 2);
+
+ new RowSetComparison(expectedRowSet).verify(DirectRowSet.fromContainer(destContainer));
} finally {
- srcRowSet.clear();
-
- if (srcRowSet instanceof RowSet.HyperRowSet) {
- ((RowSet.HyperRowSet)srcRowSet).getSv4().clear();
+ if (mockRecordBatch != null) {
+ mockRecordBatch.close();
}
- destRowSet.clear();
+ srcRowSet.clear();
+ destContainer.clear();
expectedRowSet.clear();
}
}
}
- public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException;
+ public abstract RowSet createSrcRowSet(BufferAllocator allocator) throws SchemaChangeException;
public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer,
SchemaChangeCallBack callback) {
@@ -117,7 +139,7 @@ public abstract class AbstractGenericCopierTest {
return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}};
}
- private RowSet createExpectedRowset(RootAllocator allocator) {
+ public RowSet createExpectedRowset(BufferAllocator allocator) {
return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
.addRow(row1())
.addRow(row2())
@@ -125,7 +147,7 @@ public abstract class AbstractGenericCopierTest {
.build();
}
- protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) {
+ protected BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) {
MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT));
MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR));
MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4));
@@ -136,6 +158,6 @@ public abstract class AbstractGenericCopierTest {
.add(colC)
.add(colD)
.withSVMode(mode)
- .buildSchema();
+ .build();
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
index d6c38e7..2636490 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.svremover;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
@@ -27,7 +27,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
public class GenericCopierTest extends AbstractGenericCopierTest {
@Override
- public RowSet createSrcRowSet(RootAllocator allocator) {
+ public RowSet createSrcRowSet(BufferAllocator allocator) {
return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
.addRow(row1())
.addRow(row2())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
index 748e0d0..2fec0e5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.svremover;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetBuilder;
@@ -29,7 +29,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
public class GenericSV2BatchCopierTest extends AbstractGenericCopierTest {
@Override
- public RowSet createSrcRowSet(RootAllocator allocator) {
+ public RowSet createSrcRowSet(BufferAllocator allocator) {
return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
.addSelection(true, row1())
.addRow(row2())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
index b2f0e51..42182e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.svremover;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetBuilder;
@@ -25,7 +25,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
public class GenericSV2CopierTest extends AbstractGenericCopierTest {
@Override
- public RowSet createSrcRowSet(RootAllocator allocator) {
+ public RowSet createSrcRowSet(BufferAllocator allocator) {
return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
.addRow(row1())
.addSelection(false, row4())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
index a5f5bb7..46edab7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
@@ -19,11 +19,10 @@ package org.apache.drill.exec.physical.impl.svremover;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.test.rowSet.HyperRowSetImpl;
import org.apache.drill.test.rowSet.RowSet;
@@ -32,8 +31,8 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
public class GenericSV4CopierTest extends AbstractGenericCopierTest {
@Override
- public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException {
- final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+ public RowSet createSrcRowSet(BufferAllocator allocator) throws SchemaChangeException {
+ final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
final DrillBuf drillBuf = allocator.buffer(4 * 3);
final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE);
@@ -52,10 +51,12 @@ public class GenericSV4CopierTest extends AbstractGenericCopierTest {
final ExpandableHyperContainer hyperContainer = new ExpandableHyperContainer(batch1);
hyperContainer.addBatch(batch2);
+ hyperContainer.setRecordCount(5);
sv4.set(0, 0, 0);
sv4.set(1, 1, 0);
sv4.set(2, 1, 2);
+ sv4.setCount(3);
return new HyperRowSetImpl(hyperContainer, sv4);
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index 0a8cd88..363a939 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -38,6 +38,8 @@ import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.junit.Ignore;
import org.junit.Test;
@@ -51,7 +53,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> jsonBatches = Lists.newArrayList(
"[{\"x\": 5 },{\"x\": 10 }]",
"[{\"x\": 20 },{\"x\": 30 },{\"x\": 40 }]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(jsonBatches)
.baselineColumns("x")
@@ -69,7 +71,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> jsonBatches = Lists.newArrayList(
"[{\"json_col\": \"{ \\\"a\\\" : 1 }\"}]",
"[{\"json_col\": \"{ \\\"a\\\" : 5 }\"}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(jsonBatches)
.baselineColumns("complex_col")
@@ -91,7 +93,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> rightJsonBatches = Lists.newArrayList(
"[{\"x1\": 5, \"a2\" : \"asdf\"}]",
"[{\"x1\": 6, \"a2\" : \"qwerty\"},{\"x1\": 5, \"a2\" : \"12345\"}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
.baselineColumns("x", "a", "a2", "x1")
@@ -116,7 +118,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> rightJsonBatches = Lists.newArrayList(
"[{\"x1\": 5, \"a2\" : \"asdf\"}]",
"[{\"x1\": 5, \"a2\" : \"12345\"}, {\"x1\": 6, \"a2\" : \"qwerty\"}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
.baselineColumns("x", "a", "a2", "x1")
@@ -135,7 +137,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(aggConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("b_sum", "a")
@@ -150,7 +152,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(aggConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("b_sum", "a")
@@ -165,7 +167,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": {\"b\" : 1 }}]",
"[{\"a\": {\"b\" : 5}},{\"a\": {\"b\" : 8}}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(complexToJson)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a")
@@ -182,7 +184,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(filterConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -206,7 +208,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
inputJsonBatches.add(batchString.toString());
}
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -225,7 +227,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(sortConf)
.maxAllocation(15_000_000L)
.inputDataStreamJson(inputJsonBatches)
@@ -253,8 +255,8 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
inputJsonBatches.add(batchString.toString());
}
- OperatorTestBuilder opTestBuilder =
- opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder =
+ legacyOpTestBuilder()
.initReservation(initReservation)
.maxAllocation(maxAllocation)
.physicalOperator(sortConf)
@@ -312,7 +314,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(sortConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -336,7 +338,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
List<String> rightJsonBatches = Lists.newArrayList(
"[{\"x\": 5, \"a\" : \"asdf\"}]",
"[{\"x\": 5, \"a\" : \"12345\"}, {\"x\": 6, \"a\" : \"qwerty\"}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(mergeConf)
.inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
.baselineColumns("x", "a")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index 79f260f..2f05bba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -171,7 +172,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
}
/**
- * Similar to {@link OperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
+ * Similar to {@link LegacyOperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches.
* The input record batch could be a non-scan operator by calling {@link PopBuilder#addInputAsChild},
* or a scan operator by calling {@link PopBuilder#addJsonScanAsChild()} if it's SCAN operator.
*
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 84a4fbc..f632455 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -49,6 +49,8 @@ import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedListVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.junit.Test;
import java.util.ArrayList;
@@ -121,7 +123,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns(baselineColumns)
@@ -197,7 +199,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", memoryLimit);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns(baselineColumns)
@@ -276,7 +278,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns(baselineColumns)
@@ -347,7 +349,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns(baselineColumns)
@@ -418,7 +420,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns(baselineColumns)
@@ -481,7 +483,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -549,7 +551,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -611,7 +613,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -672,7 +674,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -760,7 +762,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -869,7 +871,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -967,7 +969,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -1065,7 +1067,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
// Here we expect 16 batches because each batch will be limited by upper limit of 65535 records.
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "c")
@@ -1125,7 +1127,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// Here we expect 10 batches because each batch will be bounded by lower limit of at least 1 record.
// do not check the output batch size as it will be more than configured value of 1024, so we get
// at least one record out.
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "c")
@@ -1172,7 +1174,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
batchString.append("]");
inputJsonBatches.add(batchString.toString());
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
@@ -1233,7 +1235,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b", "c")
@@ -1302,7 +1304,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(mergeJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(4) // verify number of batches
@@ -1372,7 +1374,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We should get 1 batch.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(mergeJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(1) // verify number of batches
@@ -1426,7 +1428,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// "a1" : 5, "c1" : 3, "a2":6, "c2": 3
// expect two batches, batch limited by 65535 records
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(mergeJoin)
.baselineColumns("a1", "c1", "a2", "c2")
.expectedNumBatches(2) // verify number of batches
@@ -1481,7 +1483,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// set very low value of output batch size so we can do only one row per batch.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(mergeJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(10) // verify number of batches
@@ -1549,7 +1551,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We should get 2 batches, one for the left and one for the right.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(unionAll)
.baselineColumns("a1", "b1", "c1")
.expectedNumBatches(2) // verify number of batches
@@ -1620,7 +1622,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We should get 4 batches, 2 for the left and 2 for the right.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(unionAll)
.baselineColumns("a1", "b1", "c1")
.expectedNumBatches(4) // verify number of batches
@@ -1692,7 +1694,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We should get 22 batches for 22 rows.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(unionAll)
.baselineColumns("a1","b1", "c1")
.expectedNumBatches(22) // verify number of batches
@@ -1764,7 +1766,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(4) // verify number of batches
@@ -1834,7 +1836,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We should get 1 batch.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(1) // verify number of batches
@@ -1888,7 +1890,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// "a1" : 5, "c1" : 3, "a2":6, "c2": 3
// expect two batches, batch limited by 65535 records
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashJoin)
.baselineColumns("a1", "c1", "a2", "c2")
.expectedNumBatches(2) // verify number of batches
@@ -1943,7 +1945,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// set very low value of output batch size so we can do only one row per batch.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(10) // verify number of batches
@@ -2011,7 +2013,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(4) // verify number of batches
@@ -2080,7 +2082,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(4) // verify number of batches
@@ -2102,7 +2104,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
- opTestBuilder()
+ legacyOpTestBuilder()
.physicalOperator(aggConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("b_sum", "a")
@@ -2157,7 +2159,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashAgg)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b_sum")
@@ -2218,7 +2220,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashAgg)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b_avg")
@@ -2278,7 +2280,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately get 2 batches and max of 4.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(hashAgg)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b_max")
@@ -2350,7 +2352,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately 4 batches.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(nestedLoopJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(4) // verify number of batches
@@ -2424,7 +2426,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We should get 1 batch.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(nestedLoopJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(1) // verify number of batches
@@ -2482,7 +2484,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250
// expect two batches, batch limited by 65535 records
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(nestedLoopJoin)
.baselineColumns("a1", "c1", "a2", "c2")
.expectedNumBatches(2) // verify number of batches
@@ -2542,7 +2544,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// set very low value of output batch size so we can do only one row per batch.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(nestedLoopJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(10) // verify number of batches
@@ -2612,7 +2614,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
// We will get approximately 4 batches.
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
- OperatorTestBuilder opTestBuilder = opTestBuilder()
+ LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(nestedLoopJoin)
.baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
.expectedNumBatches(4) // verify number of batches
@@ -2949,4 +2951,4 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
}
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
index 9f4e026..4e0a739 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.store.parquet.columnreaders;
import java.math.BigDecimal;
import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.ColumnMemoryUsageInfo;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java
new file mode 100644
index 0000000..dacb061
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LegacyOperatorTestBuilder.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * @deprecated Use {@link OperatorTestBuilder} instead.
+ */
+@Deprecated
+public class LegacyOperatorTestBuilder {
+
+ private PhysicalOpUnitTestBase physicalOpUnitTestBase;
+
+ private PhysicalOperator popConfig;
+ private String[] baselineColumns;
+ private List<Map<String, Object>> baselineRecords;
+ private List<List<String>> inputStreamsJSON;
+ private long initReservation = AbstractBase.INIT_ALLOCATION;
+ private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+ private boolean expectNoRows;
+ private Long expectedBatchSize;
+ private Integer expectedNumBatches;
+ private Integer expectedTotalRows;
+
+ public LegacyOperatorTestBuilder(PhysicalOpUnitTestBase physicalOpUnitTestBase) {
+ this.physicalOpUnitTestBase = physicalOpUnitTestBase;
+ }
+
+ @SuppressWarnings({"unchecked", "resource"})
+ public void go() {
+ BatchCreator<PhysicalOperator> opCreator;
+ RecordBatch testOperator;
+ try {
+ physicalOpUnitTestBase.mockOpContext(popConfig, initReservation, maxAllocation);
+
+ opCreator = (BatchCreator<PhysicalOperator>) physicalOpUnitTestBase.opCreatorReg.getOperatorCreator(popConfig.getClass());
+ List<RecordBatch> incomingStreams = Lists.newArrayList();
+ if (inputStreamsJSON != null) {
+ for (List<String> batchesJson : inputStreamsJSON) {
+ incomingStreams.add(new ScanBatch(popConfig, physicalOpUnitTestBase.fragContext,
+ physicalOpUnitTestBase.getReaderListForJsonBatches(batchesJson, physicalOpUnitTestBase.fragContext)));
+ }
+ }
+
+ testOperator = opCreator.getBatch(physicalOpUnitTestBase.fragContext, popConfig, incomingStreams);
+
+ Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new PhysicalOpUnitTestBase.BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows);
+ if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results
+
+ Map<String, List<Object>> expectedSuperVectors;
+
+ if (expectNoRows) {
+ expectedSuperVectors = new TreeMap<>();
+ for (String column : baselineColumns) {
+ expectedSuperVectors.put(column, new ArrayList<>());
+ }
+ } else {
+ expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+ }
+
+ DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+
+ } catch (ExecutionSetupException e) {
+ throw new RuntimeException(e);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public LegacyOperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+ this.popConfig = batch;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder initReservation(long initReservation) {
+ this.initReservation = initReservation;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder maxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
+ this.inputStreamsJSON = new ArrayList<>();
+ this.inputStreamsJSON.add(jsonBatches);
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
+ this.inputStreamsJSON = childStreams;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder baselineColumns(String... columns) {
+ for (int i = 0; i < columns.length; i++) {
+ LogicalExpression ex = physicalOpUnitTestBase.parseExpr(columns[i]);
+ if (ex instanceof SchemaPath) {
+ columns[i] = ((SchemaPath)ex).toExpr();
+ } else {
+ throw new IllegalStateException("Schema path is not a valid format.");
+ }
+ }
+ this.baselineColumns = columns;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder baselineValues(Object... baselineValues) {
+ if (baselineRecords == null) {
+ baselineRecords = new ArrayList<>();
+ }
+ Map<String, Object> ret = new HashMap<>();
+ int i = 0;
+ Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
+ "Must supply the same number of baseline values as columns.");
+ for (String s : baselineColumns) {
+ ret.put(s, baselineValues[i]);
+ i++;
+ }
+ this.baselineRecords.add(ret);
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder expectZeroRows() {
+ this.expectNoRows = true;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
+ this.expectedNumBatches = expectedNumBatches;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder expectedBatchSize(Long batchSize) {
+ this.expectedBatchSize = batchSize;
+ return this;
+ }
+
+ public LegacyOperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
+ this.expectedTotalRows = expectedTotalRows;
+ return this;
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java
new file mode 100644
index 0000000..746422c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilder.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.svremover.Copier;
+import org.apache.drill.exec.physical.impl.svremover.GenericCopier;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class OperatorTestBuilder {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorTestBuilder.class);
+
+ private final List<RowSet> expectedResults = new ArrayList<>();
+ private final List<MockRecordBatch> upstreamBatches = new ArrayList<>();
+ private PhysicalOpUnitTestBase physicalOpUnitTestBase;
+
+ private PhysicalOperator physicalOperator;
+ private long initReservation = AbstractBase.INIT_ALLOCATION;
+ private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+ private Optional<Integer> expectedNumBatchesOpt = Optional.empty();
+ private Optional<Integer> expectedTotalRowsOpt = Optional.empty();
+ private boolean combineOutputBatches;
+ private boolean unordered;
+
+ public OperatorTestBuilder(PhysicalOpUnitTestBase physicalOpUnitTestBase) {
+ this.physicalOpUnitTestBase = physicalOpUnitTestBase;
+ }
+
+ @SuppressWarnings({"unchecked", "resource"})
+ public void go() throws Exception {
+ final List<RowSet> actualResults = new ArrayList<>();
+ CloseableRecordBatch testOperator = null;
+
+ try {
+ validate();
+ int expectedNumBatches = expectedNumBatchesOpt.orElse(expectedResults.size());
+ physicalOpUnitTestBase.mockOpContext(physicalOperator, initReservation, maxAllocation);
+
+ final BatchCreator<PhysicalOperator> opCreator = (BatchCreator<PhysicalOperator>) physicalOpUnitTestBase.opCreatorReg.getOperatorCreator(physicalOperator.getClass());
+ testOperator = opCreator.getBatch(physicalOpUnitTestBase.fragContext, physicalOperator, (List)upstreamBatches);
+
+ batchIterator: for (int batchIndex = 0;; batchIndex++) {
+ final RecordBatch.IterOutcome outcome = testOperator.next();
+
+ switch (outcome) {
+ case NONE:
+ if (!combineOutputBatches) {
+ Assert.assertEquals(expectedNumBatches, batchIndex);
+ }
+ // We are done iterating over batches. Now we need to compare them.
+ break batchIterator;
+ case OK_NEW_SCHEMA:
+ boolean skip = true;
+
+ try {
+ skip = testOperator.getContainer().getRecordCount() == 0;
+ } catch (IllegalStateException e) {
+ // We should skip this batch in this case. It means no data was included with the okay schema
+ } finally {
+ if (skip) {
+ batchIndex--;
+ break;
+ }
+ }
+ case OK:
+ if (!combineOutputBatches && batchIndex >= expectedNumBatches) {
+ testOperator.getContainer().clear();
+ Assert.fail("More batches received than expected.");
+ } else {
+ final boolean hasSelectionVector = testOperator.getSchema().getSelectionVectorMode().hasSelectionVector;
+ final VectorContainer container = testOperator.getContainer();
+
+ if (hasSelectionVector) {
+ throw new UnsupportedOperationException("Implement DRILL-6698");
+ } else {
+ actualResults.add(DirectRowSet.fromContainer(container));
+ }
+ break;
+ }
+ default:
+ throw new UnsupportedOperationException("Can't handle this yet");
+ }
+ }
+
+ int actualTotalRows = actualResults.stream()
+ .mapToInt(RowSet::rowCount)
+ .reduce(Integer::sum)
+ .orElse(0);
+
+ if (expectedResults.isEmpty()) {
+ Assert.assertEquals((int) expectedTotalRowsOpt.orElse(0), actualTotalRows);
+ // We are done, we don't have any expected result to compare
+ return;
+ }
+
+ if (combineOutputBatches) {
+ final RowSet expectedBatch = expectedResults.get(0);
+ final RowSet actualBatch = DirectRowSet.fromSchema(
+ physicalOpUnitTestBase.operatorFixture.allocator, actualResults.get(0).container().getSchema());
+ final VectorContainer actualBatchContainer = actualBatch.container();
+ actualBatchContainer.setRecordCount(0);
+
+ final int numColumns = expectedBatch.schema().size();
+ List<MutableInt> totalBytesPerColumn = new ArrayList<>();
+
+ for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+ totalBytesPerColumn.add(new MutableInt());
+ }
+
+ // Get column sizes for each result batch
+
+ final List<List<RecordBatchSizer.ColumnSize>> columnSizesPerBatch = actualResults.stream().map(rowSet -> {
+ switch (rowSet.indirectionType()) {
+ case NONE:
+ return new RecordBatchSizer(rowSet.container()).columnsList();
+ default:
+ throw new UnsupportedOperationException("Implement DRILL-6698");
+ }
+ }).collect(Collectors.toList());
+
+ // Get total bytes per column
+
+ for (List<RecordBatchSizer.ColumnSize> columnSizes: columnSizesPerBatch) {
+ for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+ final MutableInt totalBytes = totalBytesPerColumn.get(columnIndex);
+ final RecordBatchSizer.ColumnSize columnSize = columnSizes.get(columnIndex);
+ totalBytes.add(columnSize.getTotalDataSize());
+ }
+ }
+
+ for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
+ final ValueVector valueVector = actualBatchContainer
+ .getValueVector(columnIndex)
+ .getValueVector();
+
+ if (valueVector instanceof FixedWidthVector) {
+ ((FixedWidthVector) valueVector).allocateNew(actualTotalRows);
+ } else if (valueVector instanceof VariableWidthVector) {
+ final MutableInt totalBytes = totalBytesPerColumn.get(columnIndex);
+ ((VariableWidthVector) valueVector).allocateNew(totalBytes.getValue(), actualTotalRows);
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ try {
+ int currentIndex = 0;
+
+ for (RowSet actualRowSet: actualResults) {
+ final Copier copier;
+ final VectorContainer rowSetContainer = actualRowSet.container();
+ rowSetContainer.setRecordCount(actualRowSet.rowCount());
+
+ switch (actualRowSet.indirectionType()) {
+ case NONE:
+ copier = new GenericCopier();
+ break;
+ default:
+ throw new UnsupportedOperationException("Implement DRILL-6698");
+ }
+
+ copier.setup(rowSetContainer, actualBatchContainer);
+ copier.appendRecords(currentIndex, actualRowSet.rowCount());
+ currentIndex += actualRowSet.rowCount();
+
+ verify(expectedBatch, actualBatch);
+ }
+ } finally {
+ actualBatch.clear();
+ }
+ } else {
+ // Compare expected and actual results
+ for (int batchIndex = 0; batchIndex < expectedNumBatches; batchIndex++) {
+ final RowSet expectedBatch = expectedResults.get(batchIndex);
+ final RowSet actualBatch = actualResults.get(batchIndex);
+
+ verify(expectedBatch, actualBatch);
+ }
+ }
+ } finally {
+ // free resources
+
+ if (testOperator != null) {
+ testOperator.close();
+ }
+
+ actualResults.forEach(rowSet -> rowSet.clear());
+
+ if (expectedResults != null) {
+ expectedResults.forEach(rowSet -> rowSet.clear());
+ }
+
+ upstreamBatches.forEach(rowSetBatch -> {
+ try {
+ rowSetBatch.close();
+ } catch (Exception e) {
+ logger.error("Error while closing RowSetBatch", e);
+ }
+ });
+ }
+ }
+
+ private void verify(final RowSet expectedBatch, final RowSet actualBatch) {
+ if (unordered) {
+ new RowSetComparison(expectedBatch).unorderedVerify(actualBatch);
+ } else {
+ new RowSetComparison(expectedBatch).verify(actualBatch);
+ }
+ }
+
+ /**
+ * Make sure the inputs are valid.
+ */
+ private void validate() {
+ if (combineOutputBatches) {
+ Preconditions.checkArgument(expectedResults.isEmpty() || expectedResults.size() == 1,
+ "The number of expected result batches needs to be zero or one when combining output batches");
+ Preconditions.checkArgument((expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && expectedTotalRowsOpt.isPresent())) ||
+ (!expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && !expectedTotalRowsOpt.isPresent())),
+ "When definig expectedResults, you cannot define expectedNumBatch or expectedTotalRows and vice versa");
+ } else {
+ Preconditions.checkArgument((expectedResults.isEmpty() && (expectedNumBatchesOpt.isPresent() || expectedTotalRowsOpt.isPresent())) ||
+ (!expectedResults.isEmpty() && (!expectedNumBatchesOpt.isPresent() && !expectedTotalRowsOpt.isPresent())),
+ "When definig expectedResults, you cannot define expectedNumBatch or expectedTotalRows and vice versa");
+ }
+ }
+
+ public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
+ this.physicalOperator = batch;
+ return this;
+ }
+
+ public OperatorTestBuilder initReservation(long initReservation) {
+ this.initReservation = initReservation;
+ return this;
+ }
+
+ public OperatorTestBuilder maxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ return this;
+ }
+
+ public OperatorTestBuilder expectedNumBatches(int expectedNumBatches) {
+ this.expectedNumBatchesOpt = Optional.of(expectedNumBatches);
+ return this;
+ }
+
+ public OperatorTestBuilder expectedTotalRows(int expectedTotalRows) {
+ this.expectedTotalRowsOpt = Optional.of(expectedTotalRows);
+ return this;
+ }
+
+ /**
+ * Combines all the batches output by the operator into a single batch for comparison.
+ * @return This {@link OperatorTestBuilder}.
+ */
+ public OperatorTestBuilder combineOutputBatches() {
+ combineOutputBatches = true;
+ return this;
+ }
+
+ public OperatorTestBuilder unordered() {
+ unordered = true;
+ return this;
+ }
+
+ public OperatorTestBuilder addUpstreamBatch(final MockRecordBatch mockRecordBatch) {
+ Preconditions.checkNotNull(mockRecordBatch);
+ upstreamBatches.add(mockRecordBatch);
+ return this;
+ }
+
+ public OperatorTestBuilder addExpectedResult(final RowSet rowSet) {
+ Preconditions.checkNotNull(rowSet);
+ expectedResults.add(rowSet);
+ return this;
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java
new file mode 100644
index 0000000..be3cdc2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorTestBuilderTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.ComparisonFailure;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class OperatorTestBuilderTest extends PhysicalOpUnitTestBase {
+ public static final String FIRST_NAME_COL = "firstname";
+ public static final String LAST_NAME_COL = "lastname";
+
+ @Test
+ public void noCombineUnorderedTestPass() throws Exception {
+ executeTest(buildInputData(), true, false);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void noCombineUnorderedTestFail() throws Exception {
+ executeTest(buildIncorrectData(), true, false);
+ }
+
+ @Test
+ public void noCombineOrderedTestPass() throws Exception {
+ executeTest(buildInputData(), false, false);
+ }
+
+ @Test(expected = ComparisonFailure.class)
+ public void noCombineOrderedTestFail() throws Exception {
+ executeTest(buildIncorrectData(), false, false);
+ }
+
+ @Test
+ public void combineUnorderedTestPass() throws Exception {
+ executeTest(buildInputData(), true, true);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void combineUnorderedTestFail() throws Exception {
+ executeTest(buildIncorrectData(), true, true);
+ }
+
+ @Test
+ public void combineOrderedTestPass() throws Exception {
+ executeTest(buildInputData(), false, true);
+ }
+
+ @Test(expected = ComparisonFailure.class)
+ public void combineOrderedTestFail() throws Exception {
+ executeTest(buildIncorrectData(), false, true);
+ }
+
+ private Project createProjectPhysicalOperator() {
+ final List<NamedExpression> exprs = Lists.newArrayList(
+ new NamedExpression(SchemaPath.getSimplePath(FIRST_NAME_COL), new FieldReference(FIRST_NAME_COL)),
+ new NamedExpression(SchemaPath.getSimplePath(LAST_NAME_COL), new FieldReference(LAST_NAME_COL)));
+
+ return new Project(exprs, new MockPhysicalOperator(), true);
+ }
+
+ private static TupleMetadata buildSchema() {
+ return new SchemaBuilder()
+ .add(FIRST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .add(LAST_NAME_COL, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+ }
+
+ private RowSet buildInputData() {
+ return new RowSetBuilder(operatorFixture.allocator(), buildSchema()).
+ addRow("billy", "bob").
+ addRow("bobby", "fillet").
+ build();
+ }
+
+ private RowSet buildIncorrectData() {
+ return new RowSetBuilder(operatorFixture.allocator(), buildSchema()).
+ addRow("billy", "bob").
+ addRow("bambam", "fofof").
+ build();
+ }
+
+ private void executeTest(final RowSet expectedRowSet, boolean unordered, boolean combine) throws Exception {
+ final MockRecordBatch inputRowSetBatch = new MockRecordBatch.Builder().
+ sendData(buildInputData()).
+ build(fragContext);
+
+ final OperatorTestBuilder testBuilder = opTestBuilder()
+ .physicalOperator(createProjectPhysicalOperator());
+
+ if (combine) {
+ testBuilder.combineOutputBatches();
+ }
+
+ if (unordered) {
+ testBuilder.unordered();
+ }
+
+ testBuilder
+ .addUpstreamBatch(inputRowSetBatch)
+ .addExpectedResult(expectedRowSet)
+ .go();
+ }
+
+ public static class MockPhysicalOperator extends AbstractBase {
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return null;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return null;
+ }
+
+ @Override
+ public int getOperatorType() {
+ return 0;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return null;
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
similarity index 71%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
rename to exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index b2412e5..1c4779c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -15,12 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.physical.unit;
+package org.apache.drill.test;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -40,9 +38,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-import org.apache.drill.test.DrillTestWrapper;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.LogicalExpression;
@@ -55,27 +51,22 @@ import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
-import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.test.OperatorFixture;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.mockito.Mockito;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -83,7 +74,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.TreeMap;
public class PhysicalOpUnitTestBase extends ExecTest {
protected MockExecutorFragmentContext fragContext;
@@ -96,9 +86,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
@Rule
public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
- private final DrillConfig drillConf = DrillConfig.create();
- private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
- private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
+ protected final DrillConfig drillConf = DrillConfig.create();
+ protected final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
+ protected final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
@Before
public void setup() throws Exception {
@@ -186,141 +176,22 @@ public class PhysicalOpUnitTestBase extends ExecTest {
}
}
- protected OperatorTestBuilder opTestBuilder() {
- return new OperatorTestBuilder();
+ /**
+ * Gets a {@link LegacyOperatorTestBuilder}.
+ * @deprecated Use {@link #opTestBuilder()} instead.
+ * @return A {@link LegacyOperatorTestBuilder}.
+ */
+ @Deprecated
+ protected LegacyOperatorTestBuilder legacyOpTestBuilder() {
+ return new LegacyOperatorTestBuilder(this);
}
- protected class OperatorTestBuilder {
-
- private PhysicalOperator popConfig;
- private String[] baselineColumns;
- private List<Map<String, Object>> baselineRecords;
- private List<List<String>> inputStreamsJSON;
- private long initReservation = AbstractBase.INIT_ALLOCATION;
- private long maxAllocation = AbstractBase.MAX_ALLOCATION;
- private boolean checkBatchMemory;
- private boolean expectNoRows;
- private Long expectedBatchSize;
- private Integer expectedNumBatches;
- private Integer expectedTotalRows;
-
- @SuppressWarnings({"unchecked", "resource"})
- public void go() {
- BatchCreator<PhysicalOperator> opCreator;
- RecordBatch testOperator;
- try {
- mockOpContext(popConfig, initReservation, maxAllocation);
-
- opCreator = (BatchCreator<PhysicalOperator>) opCreatorReg.getOperatorCreator(popConfig.getClass());
- List<RecordBatch> incomingStreams = Lists.newArrayList();
- if (inputStreamsJSON != null) {
- for (List<String> batchesJson : inputStreamsJSON) {
- incomingStreams.add(new ScanBatch(popConfig, fragContext,
- getReaderListForJsonBatches(batchesJson, fragContext)));
- }
- }
-
- testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
-
- Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows);
- if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results
-
- Map<String, List<Object>> expectedSuperVectors;
-
- if (expectNoRows) {
- expectedSuperVectors = new TreeMap<>();
- for (String column : baselineColumns) {
- expectedSuperVectors.put(column, new ArrayList<>());
- }
- } else {
- expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
- }
-
- DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
-
- } catch (ExecutionSetupException e) {
- throw new RuntimeException(e);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public OperatorTestBuilder physicalOperator(PhysicalOperator batch) {
- this.popConfig = batch;
- return this;
- }
-
- public OperatorTestBuilder initReservation(long initReservation) {
- this.initReservation = initReservation;
- return this;
- }
-
- public OperatorTestBuilder maxAllocation(long maxAllocation) {
- this.maxAllocation = maxAllocation;
- return this;
- }
-
- public OperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) {
- this.inputStreamsJSON = new ArrayList<>();
- this.inputStreamsJSON.add(jsonBatches);
- return this;
- }
-
- public OperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) {
- this.inputStreamsJSON = childStreams;
- return this;
- }
-
- public OperatorTestBuilder baselineColumns(String... columns) {
- for (int i = 0; i < columns.length; i++) {
- LogicalExpression ex = parseExpr(columns[i]);
- if (ex instanceof SchemaPath) {
- columns[i] = ((SchemaPath)ex).toExpr();
- } else {
- throw new IllegalStateException("Schema path is not a valid format.");
- }
- }
- this.baselineColumns = columns;
- return this;
- }
-
- public OperatorTestBuilder baselineValues(Object... baselineValues) {
- if (baselineRecords == null) {
- baselineRecords = new ArrayList<>();
- }
- Map<String, Object> ret = new HashMap<>();
- int i = 0;
- Preconditions.checkArgument(baselineValues.length == baselineColumns.length,
- "Must supply the same number of baseline values as columns.");
- for (String s : baselineColumns) {
- ret.put(s, baselineValues[i]);
- i++;
- }
- this.baselineRecords.add(ret);
- return this;
- }
-
- public OperatorTestBuilder expectZeroRows() {
- this.expectNoRows = true;
- return this;
- }
-
- public OperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
- this.expectedNumBatches = expectedNumBatches;
- return this;
- }
-
- public OperatorTestBuilder expectedBatchSize(Long batchSize) {
- this.expectedBatchSize = batchSize;
- return this;
- }
-
- public OperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
- this.expectedTotalRows = expectedTotalRows;
- return this;
- }
+ /**
+ * Gets an {@link OperatorTestBuilder}.
+ * @return An {@link OperatorTestBuilder}.
+ */
+ protected OperatorTestBuilder opTestBuilder() {
+ return new OperatorTestBuilder(this);
}
/**
@@ -446,11 +317,11 @@ public class PhysicalOpUnitTestBase extends ExecTest {
/**
* <h2>Note</h2>
* <p>
- * The {@link MockPhysicalOperator} should only be used in {@link PhysicalOpUnitTestBase} because {@link PhysicalOpUnitTestBase}
+ * The {@link MockPhysicalOperator} should only be used in {@link PhysicalOpUnitTestBase} and its ancestors because {@link PhysicalOpUnitTestBase}
* needs a dummy {@link MockPhysicalOperator} to be passed to Scanners.
* </p>
*/
- protected static class MockPhysicalOperator extends AbstractBase
+ public static class MockPhysicalOperator extends AbstractBase
{
@Override
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
deleted file mode 100644
index a3cd918..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.test.rowSet;
-
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-
-import java.util.Iterator;
-
-public class RowSetBatch implements RecordBatch {
- private final RowSet rowSet;
-
- public RowSetBatch(final RowSet rowSet) {
- this.rowSet = Preconditions.checkNotNull(rowSet);
- }
-
- @Override
- public FragmentContext getContext() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BatchSchema getSchema() {
- return rowSet.batchSchema();
- }
-
- @Override
- public int getRecordCount() {
- return rowSet.container().getRecordCount();
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- if (rowSet instanceof IndirectRowSet) {
- return ((IndirectRowSet)rowSet).getSv2();
- }
-
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- if (rowSet instanceof RowSet.HyperRowSet) {
- return ((RowSet.HyperRowSet)rowSet).getSv4();
- }
-
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void kill(boolean sendUpstream) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public VectorContainer getOutgoingContainer() {
- return rowSet.container();
- }
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return rowSet.container().getValueVectorId(path);
- }
-
- @Override
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
- return rowSet.container().getValueAccessorById(clazz, ids);
- }
-
- @Override
- public IterOutcome next() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public WritableBatch getWritableBatch() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<VectorWrapper<?>> iterator() {
- return rowSet.container().iterator();
- }
-
- @Override
- public VectorContainer getContainer() { return rowSet.container(); }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
index e098e33..b1cb058 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -20,13 +20,22 @@ package org.apache.drill.test.rowSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
import org.apache.drill.exec.vector.accessor.ArrayReader;
import org.apache.drill.exec.vector.accessor.ObjectReader;
import org.apache.drill.exec.vector.accessor.ScalarReader;
import org.apache.drill.exec.vector.accessor.TupleReader;
import org.bouncycastle.util.Arrays;
+import org.junit.Assert;
/**
* For testing, compare the contents of two row sets (record batches)
@@ -58,10 +67,10 @@ public class RowSetComparison {
*/
private boolean mask[];
/**
- * Floats and doubles do not compare exactly. This delta is used
- * by JUnit for such comparisons.
+ * Floats and doubles do not compare exactly. This MathContext is used
+ * to construct BigDecimals of the desired precision.
*/
- private double delta = 0.001;
+ private MathContext scale = new MathContext(3);
/**
* Tests can skip the first n rows.
*/
@@ -106,14 +115,15 @@ public class RowSetComparison {
}
/**
- * Specify the delta value to use when comparing float or
+ * Specify the precision to use when comparing float or
* double values.
*
- * @param delta the delta to use in float and double comparisons
+ * @param scale the precision to use for comparing floats and doubles. See {@link BigDecimal#scale()} for
+ * a definition scale.
* @return this builder
*/
- public RowSetComparison withDelta(double delta) {
- this.delta = delta;
+ public RowSetComparison withScale(int scale) {
+ this.scale = new MathContext(scale);
return this;
}
@@ -142,28 +152,93 @@ public class RowSetComparison {
return this;
}
+ private void compareSchemasAndCounts(RowSet actual) {
+ assertTrue("Schemas don't match.\n" +
+ "Expected: " + expected.schema().toString() +
+ "\nActual: " + actual.schema(), expected.schema().isEquivalent(actual.schema()));
+ int testLength = getTestLength();
+ int dataLength = offset + testLength;
+ assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
+ assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+ }
+
+ private int getTestLength() {
+ return span > -1 ? span : expected.rowCount() - offset;
+ }
+
+ public void unorderedVerify(RowSet actual) {
+ compareSchemasAndCounts(actual);
+
+ int testLength = getTestLength();
+ RowSetReader er = expected.reader();
+ RowSetReader ar = actual.reader();
+
+ for (int i = 0; i < offset; i++) {
+ er.next();
+ ar.next();
+ }
+
+ final Multiset<List<Object>> expectedSet = HashMultiset.create();
+ final Multiset<List<Object>> actualSet = HashMultiset.create();
+
+ for (int rowCounter = 0; rowCounter < testLength; rowCounter++) {
+ er.next();
+ ar.next();
+
+ expectedSet.add(buildRow(er));
+ actualSet.add(buildRow(ar));
+ }
+
+ Assert.assertEquals(expectedSet, actualSet);
+ }
+
+ /**
+ * Convenience method to verify the actual results, then free memory
+ * for both the expected and actual result sets.
+ * @param actual the actual results to verify
+ */
+ public void unorderedVerifyAndClearAll(RowSet actual) {
+ try {
+ unorderedVerify(actual);
+ } finally {
+ expected.clear();
+ actual.clear();
+ }
+ }
+
+ private List<Object> buildRow(RowSetReader reader) {
+ final List<Object> row = new ArrayList<>();
+
+ for (int i = 0; i < mask.length; i++) {
+ if (!mask[i]) {
+ continue;
+ }
+
+ final ScalarReader scalarReader = reader.column(i).scalar();
+ final Object value = getScalar(scalarReader);
+ row.add(value);
+ }
+
+ return row;
+ }
+
/**
* Verify the actual rows using the rules defined in this builder
* @param actual the actual results to verify
*/
public void verify(RowSet actual) {
- assertTrue("Schemas don't match.\n" +
- "Expected: " + expected.schema().toString() +
- "\nActual: " + actual.schema(), expected.schema().isEquivalent(actual.schema()));
- int testLength = expected.rowCount() - offset;
- if (span > -1) {
- testLength = span;
- }
- int dataLength = offset + testLength;
- assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
- assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+ compareSchemasAndCounts(actual);
+ int testLength = getTestLength();
+
RowSetReader er = expected.reader();
RowSetReader ar = actual.reader();
+
for (int i = 0; i < offset; i++) {
er.next();
ar.next();
}
+
for (int i = 0; i < testLength; i++) {
er.next();
ar.next();
@@ -245,34 +320,32 @@ public class RowSetComparison {
if (! ec.isNull()) {
assertTrue(label + " - column is null", ! ac.isNull());
}
+
switch (ec.valueType()) {
- case BYTES: {
+ case BYTES:
byte expected[] = ac.getBytes();
byte actual[] = ac.getBytes();
assertEquals(label + " - byte lengths differ", expected.length, actual.length);
assertTrue(label, Arrays.areEqual(expected, actual));
break;
- }
- case DOUBLE:
- assertEquals(label, ec.getDouble(), ac.getDouble(), delta);
- break;
- case INTEGER:
- assertEquals(label, ec.getInt(), ac.getInt());
- break;
- case LONG:
- assertEquals(label, ec.getLong(), ac.getLong());
- break;
- case STRING:
- assertEquals(label, ec.getString(), ac.getString());
- break;
- case DECIMAL:
- assertEquals(label, ec.getDecimal(), ac.getDecimal());
- break;
- case PERIOD:
- assertEquals(label, ec.getPeriod(), ac.getPeriod());
- break;
- default:
- throw new IllegalStateException( "Unexpected type: " + ec.valueType());
+ default:
+ assertEquals(label, getScalar(ec), getScalar(ac));
+ }
+ }
+
+ private Object getScalar(final ScalarReader scalarReader) {
+ if (scalarReader.isNull()) {
+ return Optional.absent();
+ }
+
+ switch (scalarReader.valueType()) {
+ case BYTES:
+ return ByteBuffer.wrap(scalarReader.getBytes());
+ case DOUBLE: {
+ return new BigDecimal(scalarReader.getDouble(), this.scale).stripTrailingZeros();
+ }
+ default:
+ return scalarReader.getObject();
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java
new file mode 100644
index 0000000..b8bdb12
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/TestRowSetComparison.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRowSetComparison {
+ private BufferAllocator allocator;
+
+ @Before
+ public void setup() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @Test
+ public void simpleUnorderedComparisonMatchTest() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+ .add("b", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+
+ final RowSet expected = new RowSetBuilder(allocator, schema)
+ .addRow(1, 1)
+ .addRow(1, 1)
+ .addRow(1, 2)
+ .addRow(2, 1)
+ .build();
+
+ final RowSet actual = new RowSetBuilder(allocator, schema)
+ .addRow(1, 1)
+ .addRow(1, 2)
+ .addRow(2, 1)
+ .addRow(1, 1)
+ .build();
+
+ try {
+ new RowSetComparison(expected).unorderedVerify(actual);
+ } finally {
+ expected.clear();
+ actual.clear();
+ }
+ }
+
+ @Test
+ public void simpleDoubleUnorderedComparisonMatchTest() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REQUIRED)
+ .add("b", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+
+ final RowSet expected = new RowSetBuilder(allocator, schema)
+ .addRow(1.0f, 1.0)
+ .addRow(1.0f, 1.0)
+ .addRow(1.0f, 1.01)
+ .addRow(1.01f, 1.0)
+ .build();
+
+ final RowSet actual = new RowSetBuilder(allocator, schema)
+ .addRow(1.004f, .9996)
+ .addRow(1.0f, 1.008)
+ .addRow(1.008f, 1.0)
+ .addRow(.9996f, 1.004)
+ .build();
+
+ try {
+ new RowSetComparison(expected).unorderedVerify(actual);
+ } finally {
+ expected.clear();
+ actual.clear();
+ }
+ }
+
+ @Test
+ public void simpleVarcharUnorderedComparisonMatchTest() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+
+ final RowSet expected = new RowSetBuilder(allocator, schema)
+ .addRow("aaa")
+ .addRow("bbb")
+ .addRow("ccc")
+ .addRow("bbb")
+ .build();
+
+ final RowSet actual = new RowSetBuilder(allocator, schema)
+ .addRow("ccc")
+ .addRow("aaa")
+ .addRow("bbb")
+ .addRow("bbb")
+ .build();
+
+ try {
+ new RowSetComparison(expected).unorderedVerify(actual);
+ } finally {
+ expected.clear();
+ actual.clear();
+ }
+ }
+
+ @Test(expected = AssertionError.class)
+ public void simpleUnorderedComparisonNoMatchTest() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+ .add("b", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+
+ final RowSet expected = new RowSetBuilder(allocator, schema)
+ .addRow(1, 1)
+ .addRow(3, 2)
+ .addRow(2, 4)
+ .build();
+
+ final RowSet actual = new RowSetBuilder(allocator, schema)
+ .addRow(1, 1)
+ .addRow(2, 1)
+ .addRow(1, 1)
+ .build();
+
+ try {
+ new RowSetComparison(expected).unorderedVerify(actual);
+ } finally {
+ expected.clear();
+ actual.clear();
+ }
+ }
+
+ @Test(expected = AssertionError.class)
+ public void simpleDoubleUnorderedComparisonNoMatchTest() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REQUIRED)
+ .add("b", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+
+ final RowSet expected = new RowSetBuilder(allocator, schema)
+ .addRow(1.0f, 1.0)
+ .addRow(1.0f, 1.0)
+ .addRow(1.0f, 1.01)
+ .addRow(1.01f, 1.0)
+ .build();
+
+ final RowSet actual = new RowSetBuilder(allocator, schema)
+ .addRow(1.009f, .9996)
+ .addRow(1.0f, 1.004)
+ .addRow(1.008f, 1.0)
+ .addRow(.9994f, 1.004)
+ .build();
+
+ try {
+ new RowSetComparison(expected).unorderedVerify(actual);
+ } finally {
+ expected.clear();
+ actual.clear();
+ }
+ }
+
+ @Test(expected = AssertionError.class)
+ public void simpleVarcharUnorderedComparisonNoMatchTest() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+
+ final RowSet expected = new RowSetBuilder(allocator, schema)
+ .addRow("red")
+ .addRow("bbb")
+ .addRow("ccc")
+ .addRow("bbb")
+ .build();
+
+ final RowSet actual = new RowSetBuilder(allocator, schema)
+ .addRow("ccc")
+ .addRow("aaa")
+ .addRow("blue")
+ .addRow("bbb")
+ .build();
+
+ try {
+ new RowSetComparison(expected).unorderedVerify(actual);
+ } finally {
+ expected.clear();
+ actual.clear();
+ }
+ }
+
+ @After
+ public void teardown() {
+ allocator.close();
+ }
+}
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index eb413b0..a98aa66 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -358,7 +358,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
@Override
public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
- ((${minor.class}Vector) from).data.getBytes(fromIndex * VALUE_WIDTH, data, toIndex * VALUE_WIDTH, VALUE_WIDTH);
+ copyFromSafe(fromIndex, toIndex, (${minor.class}Vector) from);
}
public void decrementAllocationMonitor() {
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index ff066fb..f82c718 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -433,7 +433,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
// Handle the case of not-nullable copied into a nullable
if (from instanceof ${minor.class}Vector) {
- bits.getMutator().set(toIndex,1);
+ bits.getMutator().setSafe(toIndex,1);
values.copyFromSafe(fromIndex,toIndex,(${minor.class}Vector)from);
return;
}