You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/01/30 21:54:12 UTC
drill git commit: DRILL-6071: Limit batch size for flatten operator
Repository: drill
Updated Branches:
refs/heads/master a9ea4ec1c -> 039530a41
DRILL-6071: Limit batch size for flatten operator
closes #1091
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/039530a4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/039530a4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/039530a4
Branch: refs/heads/master
Commit: 039530a4195ba8fa4532b9ca92980206fa66c181
Parents: a9ea4ec
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Wed Jan 10 05:06:58 2018 -0800
Committer: Boaz Ben-Zvi <bo...@apache.org>
Committed: Tue Jan 30 12:00:47 2018 -0800
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +
.../impl/flatten/FlattenRecordBatch.java | 55 ++
.../physical/impl/flatten/FlattenTemplate.java | 142 +--
.../exec/physical/impl/flatten/Flattener.java | 1 +
.../physical/impl/spill/RecordBatchSizer.java | 50 +-
.../server/options/SystemOptionManager.java | 3 +-
.../src/main/resources/drill-module.conf | 1 +
.../physical/unit/BasicPhysicalOpUnitTest.java | 29 +
.../physical/unit/MiniPlanUnitTestBase.java | 2 +-
.../physical/unit/PhysicalOpUnitTestBase.java | 45 +-
.../exec/physical/unit/TestOutputBatchSize.java | 882 +++++++++++++++++++
.../org/apache/drill/test/DrillTestWrapper.java | 39 +-
12 files changed, 1095 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 25f6135..c949e51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -76,6 +76,9 @@ public final class ExecConstants {
public static final String SPILL_FILESYSTEM = "drill.exec.spill.fs";
public static final String SPILL_DIRS = "drill.exec.spill.directories";
+ public static final String OUTPUT_BATCH_SIZE = "drill.exec.memory.operator.output_batch_size";
+ public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 1024, 512 * 1024 * 1024);
+
// External Sort Boot configuration
public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 8be16ad..9483f29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
@@ -68,6 +70,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
private boolean hasRemainder = false;
private int remainderIndex = 0;
private int recordCount;
+ private long outputBatchSize;
private final Flattener.Monitor monitor = new Flattener.Monitor() {
@Override
@@ -94,8 +97,57 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
}
}
+ private class FlattenMemoryManager {
+ private final int outputRowCount;
+ private static final int OFFSET_VECTOR_WIDTH = 4;
+ private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
+ private static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
+ private static final int MIN_NUM_ROWS = 1;
+
+ private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, SchemaPath flattenColumn) {
+ // Get sizing information for the batch.
+ RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+
+ final TypedFieldId typedFieldId = incoming.getValueVectorId(flattenColumn);
+ final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+
+ // Get column size of flatten column.
+ RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
+ typedFieldId.getFieldIds()).getValueVector(), field.getName());
+
+ // Average rowWidth of flatten column
+ final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount());
+
+ // Average rowWidth excluding the flatten column.
+ final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn;
+
+ // Average rowWidth of single element in the flatten list.
+ // subtract the offset vector size from column data size.
+ final int avgRowWidthSingleFlattenEntry =
+ RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);
+
+ // Average rowWidth of outgoing batch.
+ final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
+
+ // Number of rows in outgoing batch
+ outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS,
+ RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), avgOutgoingRowWidth)));
+
+ logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
+ "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, outputRowCount);
+ }
+
+ public int getOutputRowCount() {
+ return outputRowCount;
+ }
+ }
+
+
public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
+
+ // get the output batch size from config.
+ outputBatchSize = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
}
@Override
@@ -148,6 +200,9 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
@Override
protected IterOutcome doWork() {
+ FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(incoming, outputBatchSize, popConfig.getColumn());
+ flattener.setOutputCount(flattenMemoryManager.getOutputRowCount());
+
int incomingRecordCount = incoming.getRecordCount();
if (!doAlloc()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index ed20429..cd58bfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.TransferPair;
import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +39,7 @@ import org.slf4j.LoggerFactory;
public abstract class FlattenTemplate implements Flattener {
private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class);
- private static final int OUTPUT_BATCH_SIZE = 4*1024;
- private static final int OUTPUT_MEMORY_LIMIT = 512 * 1024 * 1024;
+ private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
private ImmutableList<TransferPair> transfers;
private BufferAllocator outputAllocator;
@@ -47,14 +47,12 @@ public abstract class FlattenTemplate implements Flattener {
private RepeatedValueVector fieldToFlatten;
private RepeatedValueVector.RepeatedAccessor accessor;
private int valueIndex;
- private boolean bigRecords = false;
- private int bigRecordsBufferSize;
/**
- * The output batch limit starts at OUTPUT_BATCH_SIZE, but may be decreased
+ * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
* if records are found to be large.
*/
- private int outputLimit = OUTPUT_BATCH_SIZE;
+ private int outputLimit = OUTPUT_ROW_COUNT;
// this allows for groups to be written between batches if we run out of space, for cases where we have finished
// a batch on the boundary it will be set to 0
@@ -73,6 +71,11 @@ public abstract class FlattenTemplate implements Flattener {
}
@Override
+ public void setOutputCount(int outputCount) {
+ outputLimit = outputCount;
+ }
+
+ @Override
public final int flattenRecords(final int recordCount, final int firstOutputIndex,
final Flattener.Monitor monitor) {
switch (svMode) {
@@ -101,75 +104,10 @@ public abstract class FlattenTemplate implements Flattener {
for ( ; innerValueIndexLocal < innerValueCount; innerValueIndexLocal++) {
// If we've hit the batch size limit, stop and flush what we've got so far.
if (recordsThisCall == outputLimit) {
- if (bigRecords) {
- /*
- * We got to the limit we used before, but did we go over
- * the bigRecordsBufferSize in the second half of the batch? If
- * so, we'll need to adjust the batch limits.
- */
- adjustBatchLimits(1, monitor, recordsThisCall);
- }
-
// Flush this batch.
break outer;
}
- /*
- * At the moment, the output record includes the input record, so for very
- * large records that we're flattening, we're carrying forward the original
- * record as well as the flattened element. We've seen a case where flattening a 4MB
- * record with a 20,000 element array causing memory usage to explode. To avoid
- * that until we can push down the selected fields to operators like this, we
- * also limit the amount of memory in use at one time.
- *
- * We have to have written at least one record to be able to get a buffer that will
- * have a real allocator, so we have to do this lazily. We won't check the limit
- * for the first two records, but that keeps this simple.
- */
- if (bigRecords) {
- /*
- * If we're halfway through the outputLimit, check on our memory
- * usage so far.
- */
- if (recordsThisCall == outputLimit / 2) {
- /*
- * If we've used more than half the space we've used for big records
- * in the past, we've seen even bigger records than before, so stop and
- * see if we need to flush here before we go over bigRecordsBufferSize
- * memory usage, and reduce the outputLimit further before we continue
- * with the next batch.
- */
- if (adjustBatchLimits(2, monitor, recordsThisCall)) {
- break outer;
- }
- }
- } else {
- if (outputAllocator.getAllocatedMemory() > OUTPUT_MEMORY_LIMIT) {
- /*
- * We're dealing with big records. Reduce the outputLimit to
- * the current record count, and take note of how much space the
- * vectors report using for that. We'll use those numbers as limits
- * going forward in order to avoid allocating more memory.
- */
- bigRecords = true;
- outputLimit = Math.min(recordsThisCall, outputLimit);
- if (outputLimit < 1) {
- throw new IllegalStateException("flatten outputLimit (" + outputLimit
- + ") won't make progress");
- }
-
- /*
- * This will differ from what the allocator reports because of
- * overhead. But the allocator check is much cheaper to do, so we
- * only compute this at selected times.
- */
- bigRecordsBufferSize = monitor.getBufferSizeFor(recordsThisCall);
-
- // Stop and flush.
- break outer;
- }
- }
-
try {
doEval(valueIndexLocal, outputIndex);
} catch (OversizedAllocationException ex) {
@@ -211,68 +149,6 @@ public abstract class FlattenTemplate implements Flattener {
}
}
- /**
- * Determine if the current batch record limit needs to be adjusted (when handling
- * bigRecord mode). If so, adjust the limit, and return true, otherwise return false.
- *
- * <p>If the limit is adjusted, it will always be adjusted down, because we need to operate
- * based on the largest sized record we've ever seen.</p>
- *
- * <p>If the limit is adjusted, then the current batch should be flushed, because
- * continuing would lead to going over the large memory limit that has already been
- * established.</p>
- *
- * @param multiplier Multiply currently used memory (according to the monitor) before
- * checking against past memory limits. This allows for checking the currently used
- * memory after processing a fraction of the expected batch limit, but using that as
- * a predictor of the full batch's size. For example, if this is checked after half
- * the batch size limit's records are processed, then using a multiplier of two will
- * do the check under the assumption that processing the full batch limit will use
- * twice as much memory.
- * @param monitor the Flattener.Monitor instance to use for the current memory usage check
- * @param recordsThisCall the number of records processed so far during this call to
- * flattenRecords().
- * @return true if the batch size limit was adjusted, false otherwise
- */
- private boolean adjustBatchLimits(final int multiplier, final Flattener.Monitor monitor,
- final int recordsThisCall) {
- assert bigRecords : "adjusting batch limits when no big records";
- final int bufferSize = multiplier * monitor.getBufferSizeFor(recordsThisCall);
-
- /*
- * If the amount of space we've used so far is below the amount that triggered
- * the bigRecords mode, then no adjustment is needed.
- */
- if (bufferSize <= bigRecordsBufferSize) {
- return false;
- }
-
- /*
- * We've used more space than we've used for big records in the past, we've seen
- * even bigger records, so we need to adjust our limits, and flush what we've got so far.
- *
- * We should reduce the outputLimit proportionately to get the predicted
- * amount of memory used back down to bigRecordsBufferSize.
- *
- * The number of records to limit is therefore
- * outputLimit *
- * (1 - (bufferSize - bigRecordsBufferSize) / bigRecordsBufferSize)
- *
- * Doing some algebra on the multiplier:
- * (bigRecordsBufferSize - (bufferSize - bigRecordsBufferSize)) / bigRecordsBufferSize
- * (bigRecordsBufferSize - bufferSize + bigRecordsBufferSize) / bigRecordsBufferSize
- * (2 * bigRecordsBufferSize - bufferSize) / bigRecordsBufferSize
- *
- * If bufferSize has gotten so big that this would be negative, we'll
- * just go down to one record per batch. We need to check for that on
- * outputLimit anyway, in order to make sure that we make progress.
- */
- final int newLimit = (int)
- (outputLimit * (2.0 * ((double) bigRecordsBufferSize) - bufferSize) / bigRecordsBufferSize);
- outputLimit = Math.max(1, newLimit);
- return true;
- }
-
@Override
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 392757e..b1d93c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -45,6 +45,7 @@ public interface Flattener {
int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor);
void setFlattenField(RepeatedValueVector repeatedColumn);
+ void setOutputCount(int outputCount);
RepeatedValueVector getFlattenField();
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index 5808f45..0fe67d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -93,9 +93,19 @@ public class RecordBatchSizer {
public final int elementCount;
+ /**
+ * Size of the top level value vector. For map and repeated list,
+ * this is just size of offset vector.
+ */
public int dataSize;
/**
+ * Total size of the column includes the sum total of memory for all
+ * value vectors representing the column.
+ */
+ public int netSize;
+
+ /**
* The estimated, average number of elements per parent value.
* Always 1 for a non-repeated type. For a repeated type,
* this is the average entries per array (per repeated element).
@@ -131,9 +141,15 @@ public class RecordBatchSizer {
break;
default:
dataSize = v.getPayloadByteCount(valueCount);
- stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
+ try {
+ stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
+ } catch (Exception e) {
+ // For unsupported types, just set stdSize to 0.
+ stdSize = 0;
+ }
}
estSize = safeDivide(dataSize, valueCount);
+ netSize = v.getPayloadByteCount(valueCount);
}
@SuppressWarnings("resource")
@@ -154,8 +170,14 @@ public class RecordBatchSizer {
return childCount;
}
+ @SuppressWarnings("resource")
private void buildList(ValueVector v) {
- @SuppressWarnings("resource")
+ // complex ListVector cannot be casted to RepeatedListVector.
+ // check the mode.
+ if (v.getField().getDataMode() != DataMode.REPEATED) {
+ dataSize = v.getPayloadByteCount(valueCount);
+ return;
+ }
UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector();
dataSize = offsetVector.getPayloadByteCount(valueCount);
}
@@ -232,6 +254,10 @@ public class RecordBatchSizer {
}
}
+ public static ColumnSize getColumn(ValueVector v, String prefix) {
+ return new ColumnSize(v, prefix);
+ }
+
public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB
private List<ColumnSize> columnSizes = new ArrayList<>();
@@ -380,14 +406,18 @@ public class RecordBatchSizer {
// vectors do consume space, so visit columns recursively.
switch (v.getField().getType().getMinorType()) {
- case MAP:
- expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
- break;
- case LIST:
- expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
- break;
- default:
- v.collectLedgers(ledgers);
+ case MAP:
+ expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
+ break;
+ case LIST:
+ // complex ListVector cannot be casted to RepeatedListVector.
+ // do not expand the list if it is not repeated mode.
+ if (v.getField().getDataMode() == DataMode.REPEATED) {
+ expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
+ }
+ break;
+ default:
+ v.collectLedgers(ledgers);
}
netRowWidth += colSize.estSize;
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 369f3bc..4dba96d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -211,7 +211,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.PERSISTENT_TABLE_UMASK_VALIDATOR),
new OptionDefinition(ExecConstants.CPU_LOAD_AVERAGE),
new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR),
- new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR)
+ new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR),
+ new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false))
};
final CaseInsensitiveMap<OptionDefinition> map = CaseInsensitiveMap.newHashMap();
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 28b7975..5659c82 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -420,6 +420,7 @@ drill.exec.options: {
drill.exec.storage.implicit.fqn.column.label: "fqn",
drill.exec.storage.implicit.suffix.column.label: "suffix",
drill.exec.testing.controls: "{}",
+ drill.exec.memory.operator.output_batch_size : 33554432, # 32 MB
exec.bulk_load_table_list.bulk_size: 1000,
exec.compile.scalar_replacement: false,
exec.enable_bulk_load_table_list: false,
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index ec85f21..fb8160a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -23,7 +23,9 @@ import java.util.List;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.ComplexToJson;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.config.Filter;
@@ -34,6 +36,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.junit.Ignore;
import org.junit.Test;
@@ -189,6 +192,32 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
}
@Test
+ public void testFlatten() {
+ final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b"));
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+
+ for (int j = 0; j < 1; j++) {
+ batchString.append("[");
+ for (int i = 0; i < 1; i++) {
+ batchString.append("{\"a\": 5, \"b\" : [5, 6, 7]}");
+ }
+ batchString.append("]");
+ inputJsonBatches.add(batchString.toString());
+ }
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 5l)
+ .baselineValues(5l, 6l)
+ .baselineValues(5l, 7l);
+
+ opTestBuilder.go();
+ }
+
+ @Test
public void testExternalSort() {
ExternalSort sortConf = new ExternalSort(null,
Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index e6e72e7..7eafb86 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -152,7 +152,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
}
Map<String, List<Object>> actualSuperVectors = new TreeMap<String, List<Object>>();
- int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, actualSuperVectors);
+ int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, null, actualSuperVectors);
if (expectBatchNum != null) {
if (expectBatchNum != actualBatchNum) {
throw new AssertionError(String.format("Expected %s batches from operator tree. But operators return %s batch!", expectBatchNum, actualBatchNum));
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index b01dd3e..d1ad990 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -74,6 +74,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -84,6 +85,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.TreeMap;
public class PhysicalOpUnitTestBase extends ExecTest {
protected MockExecutorFragmentContext fragContext;
@@ -209,16 +211,19 @@ public class PhysicalOpUnitTestBase extends ExecTest {
private List<List<String>> inputStreamsJSON;
private long initReservation = AbstractBase.INIT_ALLOCATION;
private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+ private boolean checkBatchMemory;
+ private boolean expectNoRows;
+ private Long expectedBatchSize;
+ private Integer expectedNumBatches;
- @SuppressWarnings({ "unchecked", "resource" })
+ @SuppressWarnings({"unchecked", "resource"})
public void go() {
BatchCreator<PhysicalOperator> opCreator;
RecordBatch testOperator;
try {
mockOpContext(popConfig, initReservation, maxAllocation);
- opCreator = (BatchCreator<PhysicalOperator>)
- opCreatorReg.getOperatorCreator(popConfig.getClass());
+ opCreator = (BatchCreator<PhysicalOperator>) opCreatorReg.getOperatorCreator(popConfig.getClass());
List<RecordBatch> incomingStreams = Lists.newArrayList();
if (inputStreamsJSON != null) {
for (List<String> batchesJson : inputStreamsJSON) {
@@ -229,8 +234,19 @@ public class PhysicalOpUnitTestBase extends ExecTest {
testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
- Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
- Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+ Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches);
+
+ Map<String, List<Object>> expectedSuperVectors;
+
+ if (expectNoRows) {
+ expectedSuperVectors = new TreeMap<>();
+ for (String column : baselineColumns) {
+ expectedSuperVectors.put(column, new ArrayList<>());
+ }
+ } else {
+ expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+ }
+
DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
} catch (ExecutionSetupException e) {
@@ -281,7 +297,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
return this;
}
- public OperatorTestBuilder baselineValues(Object ... baselineValues) {
+ public OperatorTestBuilder baselineValues(Object... baselineValues) {
if (baselineRecords == null) {
baselineRecords = new ArrayList<>();
}
@@ -296,6 +312,21 @@ public class PhysicalOpUnitTestBase extends ExecTest {
this.baselineRecords.add(ret);
return this;
}
+
+ public OperatorTestBuilder expectZeroRows() {
+ this.expectNoRows = true;
+ return this;
+ }
+
+ public OperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
+ this.expectedNumBatches = expectedNumBatches;
+ return this;
+ }
+
+ public OperatorTestBuilder expectedBatchSize(Long batchSize) {
+ this.expectedBatchSize = batchSize;
+ return this;
+ }
}
/**
@@ -448,7 +479,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
return getJsonReadersFromBatchString(jsonBatches, fragContext, Collections.singletonList(SchemaPath.getSimplePath("*")));
}
- private List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
+ public List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
Iterator<RecordReader> readers = getRecordReadersForJsonBatches(jsonBatches, fragContext);
List<RecordReader> readerList = new ArrayList<>();
while(readers.hasNext()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
new file mode 100644
index 0000000..4a1dc8f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
+ private static final long initReservation = AbstractBase.INIT_ALLOCATION;
+ private static final long maxAllocation = AbstractBase.MAX_ALLOCATION;
+ // Keeping row count below 4096 so we do not produce more than one batch.
+ // scanBatch with json reader produces batches of 4k.
+ private int numRows = 4000;
+ private static final String wideString =
+ "b00dUrA0oa2i4ZEHg6zvPXPXlVQYB2BXe8T5gIEtvUDzcN6yUkIqyS07gaAy8k4ac6Bn1cxblsXFnkp8g8hiQkUMJPyl6" +
+ "l0jTdsIzQ4PkVCURGGyF0aduGqCXUaKp91gqkRMvLhHhmrHdEb22QN20dXEHSygR7vrb2zZhhfWeJbXRsesuYDqdGig801IAS6VWRIdQtJ6gaRhCdNz";
+
+ /**
+ * Figures out what will be total size of the batches for a given Json input batch.
+ */
+ private long getExpectedSize(List<String> expectedJsonBatches) throws ExecutionSetupException {
+ // Create a dummy scanBatch to figure out the size.
+ RecordBatch scanBatch = new ScanBatch(new MockPhysicalOperator(), fragContext, getReaderListForJsonBatches(expectedJsonBatches, fragContext));
+ Iterable<VectorAccessible> batches = new BatchIterator(scanBatch);
+
+ long totalSize = 0;
+ for (VectorAccessible batch : batches) {
+ RecordBatchSizer sizer = new RecordBatchSizer(batch);
+ totalSize += sizer.netSize();
+ }
+ return totalSize;
+ }
+
+ @Test
+ public void testFlattenFixedWidth() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ // create input rows like this.
+ // "a" : 5, "b" : wideString, "c" : [6,7,8,9]
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]}");
+ batchString.append("]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // output rows will be like this.
+ // "a" : 5, "b" : wideString, "c" : 6
+ // "a" : 5, "b" : wideString, "c" : 7
+ // "a" : 5, "b" : wideString, "c" : 8
+ // "a" : 5, "b" : wideString, "c" : 9
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9},");
+ }
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2); // verify batch size.
+
+ for (int i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, 6l);
+ opTestBuilder.baselineValues(5l, wideString, 7l);
+ opTestBuilder.baselineValues(5l, wideString, 8l);
+ opTestBuilder.baselineValues(5l, wideString, 9l);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenVariableWidth() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ // create input rows like this.
+ // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker", "peacock"]
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]}");
+ batchString.append("]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // output rows will be like this.
+ // "a" : 5, "b" : wideString, "c" : parrot
+ // "a" : 5, "b" : wideString, "c" : hummingbird
+ // "a" : 5, "b" : wideString, "c" : owl
+ // "a" : 5, "b" : wideString, "c" : woodpecker
+ // "a" : 5, "b" : wideString, "c" : peacock
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"},");
+ }
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2); // verify batch size.
+
+ for (int i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, "parrot");
+ opTestBuilder.baselineValues(5l, wideString, "hummingbird");
+ opTestBuilder.baselineValues(5l, wideString, "owl");
+ opTestBuilder.baselineValues(5l, wideString, "woodpecker");
+ opTestBuilder.baselineValues(5l, wideString, "peacock");
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenFixedWidthList() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ // create input rows like this.
+ // "a" : 5, "b" : wideString, "c" : [[1,2,3,4], [5,6,7,8]]
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]");
+ batchString.append("},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]");
+ batchString.append("}]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // output rows will be like this.
+ // "a" : 5, "b" : wideString, "c" : [1,2,3,4]
+ // "a" : 5, "b" : wideString, "c" : [5,6,7,8]
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"},");
+ }
+
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"}");
+
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize); // verify batch size.
+
+ for (int i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(1L, 2L, 3L, 4L)));
+ opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(5L, 6L, 7L, 8L)));
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenVariableWidthList() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+
+ // create input rows like this.
+ // "a" : 5, "b" : wideString, "c" : [["parrot", "hummingbird", "owl", "woodpecker"], ["hawk", "nightingale", "swallow", "peacock"]]
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+ "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]");
+ batchString.append("},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+ "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]");
+ batchString.append("}]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // output rows will be like this.
+ // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker"]
+ // "a" : 5, "b" : wideString, "c" : ["hawk", "nightingale", "swallow", "peacock"]
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]},");
+ }
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]}");
+
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize); // verify batch size.
+
+ final JsonStringArrayList<Text> birds1 = new JsonStringArrayList<Text>() {{
+ add(new Text("parrot"));
+ add(new Text("hummingbird"));
+ add(new Text("owl"));
+ add(new Text("woodpecker"));
+ }};
+
+ final JsonStringArrayList<Text> birds2 = new JsonStringArrayList<Text>() {{
+ add(new Text("hawk"));
+ add(new Text("nightingale"));
+ add(new Text("swallow"));
+ add(new Text("peacock"));
+ }};
+
+ for (int i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, birds1);
+ opTestBuilder.baselineValues(5l, wideString, birds2);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenMap() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ // create input rows like this.
+ // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}]
+
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+ "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}," +
+ " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}");
+ batchString.append("]},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+ "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777," +
+ " \"type\":\"sports\"}," +
+ " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}");
+ batchString.append("]}]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // output rows will be like this.
+ // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}
+ // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "{\"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}},");
+ }
+
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "{\"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}}");
+
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2); // verify batch size.
+
+ JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
+ resultExpected1.put("trans_id", new Text("t1"));
+ resultExpected1.put("amount", new Long(100));
+ resultExpected1.put("trans_time", new Long(7777777));
+ resultExpected1.put("type", new Text("sports"));
+
+ JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>();
+ resultExpected2.put("trans_id", new Text("t2"));
+ resultExpected2.put("amount", new Long(1000));
+ resultExpected2.put("trans_time", new Long(8888888));
+ resultExpected2.put("type", new Text("groceries"));
+
+ for (int i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, resultExpected1);
+ opTestBuilder.baselineValues(5l, wideString, resultExpected2);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenListOfMaps() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ // create input rows like this.
+ // "a" : 5, "b" : wideString,
+ // "c" : [ [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}],
+ // [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}],
+ // [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] ]
+
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]");
+ batchString.append("]},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]");
+ batchString.append("]}]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // output rows will be like this.
+ // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}]
+ // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}]
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},");
+ }
+
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+ "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+ "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]}");
+
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2); // verify batch size.
+
+ final JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
+ resultExpected1.put("trans_id", new Text("t1"));
+ resultExpected1.put("amount", new Long(100));
+ resultExpected1.put("trans_time", new Long(7777777));
+ resultExpected1.put("type", new Text("sports"));
+
+ final JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>();
+ resultExpected2.put("trans_id", new Text("t2"));
+ resultExpected2.put("amount", new Long(1000));
+ resultExpected2.put("trans_time", new Long(8888888));
+ resultExpected2.put("type", new Text("groceries"));
+
+ final JsonStringArrayList<JsonStringHashMap<String, Object>> results = new JsonStringArrayList<JsonStringHashMap<String, Object>>() {{
+ add(resultExpected1);
+ add(resultExpected2);
+ }};
+
+ for (int i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, results);
+ opTestBuilder.baselineValues(5l, wideString, results);
+ opTestBuilder.baselineValues(5l, wideString, results);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenNestedMap() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ // create input rows like this.
+ // "a" : 5, "b" : wideString,
+ // "c" : [ {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries},
+ // {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} ]
+
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+
+ StringBuilder innerMap = new StringBuilder();
+ innerMap.append("{ \"trans_id\":\"inner_trans_t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}");
+
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+ " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+ "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap +
+ ", \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]");
+ batchString.append("},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+ " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+ "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}");
+ batchString.append("]}]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // output rows will be like this.
+ // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries}
+ // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries}
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+ "\"type\":\"sports\"} }, ");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+ "\"type\":\"sports\"} }, ");
+ }
+
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+ "\"type\":\"sports\"} }, ");
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+ " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+ "\"type\":\"sports\"} }");
+
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2); // verify batch size.
+
+ JsonStringHashMap<String, Object> innerMapResult = new JsonStringHashMap<>();
+ innerMapResult.put("trans_id", new Text("inner_trans_t1"));
+ innerMapResult.put("amount", new Long(100));
+ innerMapResult.put("trans_time", new Long(7777777));
+ innerMapResult.put("type", new Text("sports"));
+
+ JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
+ resultExpected1.put("trans_id", new Text("t1"));
+ resultExpected1.put("amount", new Long(100));
+ resultExpected1.put("trans_time", new Long(7777777));
+ resultExpected1.put("type", new Text("sports"));
+ resultExpected1.put("innerMap", innerMapResult);
+
+ JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>();
+ resultExpected2.put("trans_id", new Text("t2"));
+ resultExpected2.put("amount", new Long(1000));
+ resultExpected2.put("trans_time", new Long(8888888));
+ resultExpected2.put("type", new Text("groceries"));
+ resultExpected2.put("innerMap", innerMapResult);
+
+ for (int i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, resultExpected1);
+ opTestBuilder.baselineValues(5l, wideString, resultExpected2);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenUpperLimit() throws Exception {
+ // test the upper limit of 65535 records per batch.
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+
+ StringBuilder flattenElement = new StringBuilder();
+
+ // Create list of 1000 elements
+ flattenElement.append("[");
+ for (int i = 0; i < 1000; i++) {
+ flattenElement.append(i);
+ flattenElement.append(",");
+ }
+ flattenElement.append(1000);
+ flattenElement.append("]");
+
+ batchString.append("[");
+
+ numRows = 1000;
+
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\":" + flattenElement + "},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\":" + flattenElement + "}");
+ batchString.append("]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ for (int j = 0; j < 1000; j++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :");
+ expectedBatchString.append(j);
+ expectedBatchString.append("},");
+ }
+ }
+ for (int j = 0; j < 999; j++) {
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :");
+ expectedBatchString.append(j);
+ expectedBatchString.append("},");
+ }
+
+ expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :");
+ expectedBatchString.append(1000);
+ expectedBatchString.append("}");
+
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ // Here we expect 16 batches because each batch will be limited by upper limit of 65535 records.
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(16) // verify number of batches
+ .expectedBatchSize(totalSize / 2); // verify batch size.
+
+ for (long i = 0; i < numRows + 1; i++) {
+ for (long j = 0; j < 1001; j++) {
+ opTestBuilder.baselineValues(5l, wideString, j);
+ }
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenLowerLimit() throws Exception {
+ // test the lower limit of at least one batch
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+ StringBuilder flattenElement = new StringBuilder();
+
+ // Create list of 10 elements
+ flattenElement.append("[");
+ for (int i = 0; i < 10; i++) {
+ flattenElement.append(i);
+ flattenElement.append(",");
+ }
+ flattenElement.append(10);
+ flattenElement.append("]");
+
+ // create list of wideStrings
+ final StringBuilder wideStrings = new StringBuilder();
+ wideStrings.append("[");
+ for (int i = 0; i < 10; i++) {
+ wideStrings.append("\"" + wideString + "\",");
+ }
+ wideStrings.append("\"" + wideString + "\"");
+ wideStrings.append("]");
+
+ batchString.append("[");
+ batchString.append("{\"a\": " + wideStrings + "," + "\"c\":" + flattenElement);
+ batchString.append("}]");
+ inputJsonBatches.add(batchString.toString());
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+
+ // set very low value of batch size for a large record size.
+ // This is to test we atleast get one record per batch.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 1024);
+
+ // Here we expect 10 batches because each batch will be bounded by lower limit of at least 1 record.
+ // do not check the output batch size as it will be more than configured value of 1024, so we get
+ // at least one record out.
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "c")
+ .expectedNumBatches(10); // verify number of batches
+
+ final JsonStringArrayList<Text> results = new JsonStringArrayList<Text>() {{
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ add(new Text(wideString));
+ }};
+
+ for (long j = 0; j < 11; j++) {
+ opTestBuilder.baselineValues(results, j);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenEmptyList() throws Exception {
+ final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b"));
+
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+
+ StringBuilder flattenElement = new StringBuilder();
+
+ flattenElement.append("[");
+ flattenElement.append("]");
+
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "},");
+ }
+ batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "}");
+ batchString.append("]");
+ inputJsonBatches.add(batchString.toString());
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .expectZeroRows();
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testFlattenLargeRecords() throws Exception {
+ PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+ mockOpContext(flatten, initReservation, maxAllocation);
+
+ // create input rows like this.
+ // "a" : <id1>, "b" : wideString, "c" : [ 10 wideStrings ]
+ List<String> inputJsonBatches = Lists.newArrayList();
+ StringBuilder batchString = new StringBuilder();
+ int arrayLength = 10;
+ StringBuilder test = new StringBuilder();
+ test.append("[ \"");
+ for (int i = 0; i < arrayLength; i++) {
+ test.append(wideString);
+ test.append("\",\"");
+ }
+ test.append(wideString);
+ test.append("\"]");
+
+ batchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ batchString.append("{" + "\"a\" :" + (new StringBuilder().append(i)) + ",\"b\": \"" + wideString + "\"," +
+ "\"c\": " + test + "},");
+ }
+ batchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\"," +
+ "\"c\": " + test + "}");
+ batchString.append("]");
+ inputJsonBatches.add(batchString.toString());
+
+ // output rows will be like this.
+ // "a" : <id1>, "b" : wideString, "c" : wideString
+
+ // Figure out what will be approximate total output size out of flatten for input above
+ // We will use this sizing information to set output batch size so we can produce desired
+ // number of batches that can be verified.
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int k = 0; k < (numRows) * 11; k++) {
+ expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(k)) + ",\"b\": \"" + wideString + "\",");
+ expectedBatchString.append("\"c\": \"" + wideString + "\"},");
+ }
+ expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\",");
+ expectedBatchString.append("\"c\": \"" + wideString + "\"}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(flatten)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b", "c")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2); // verify batch size.
+
+ for (long k = 0; k < ((numRows + 1)); k++) {
+ for (int j = 0; j < arrayLength + 1; j++) {
+ opTestBuilder.baselineValues(k, wideString, wideString);
+ }
+ }
+
+ opTestBuilder.go();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index cd68bf3..c470b0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -40,6 +40,8 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.HyperVectorValueIterator;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.BatchSchema;
@@ -318,10 +320,11 @@ public class DrillTestWrapper {
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
- public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
+ public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches,
+ Long expectedBatchSize, Integer expectedNumBatches)
throws SchemaChangeException, UnsupportedEncodingException {
Map<String, List<Object>> combinedVectors = new TreeMap<>();
- addToCombinedVectorResults(batches, null, combinedVectors);
+ addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors);
return combinedVectors;
}
@@ -336,12 +339,15 @@ public class DrillTestWrapper {
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
- public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors)
+ public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema,
+ Long expectedBatchSize, Integer expectedNumBatches,
+ Map<String, List<Object>> combinedVectors)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
int numBatch = 0;
long totalRecords = 0;
BatchSchema schema = null;
+
for (VectorAccessible loader : batches) {
numBatch++;
if (expectedSchema != null) {
@@ -352,6 +358,13 @@ public class DrillTestWrapper {
}
}
+ if (expectedBatchSize != null) {
+ RecordBatchSizer sizer = new RecordBatchSizer(loader);
+ // Not checking actualSize as accounting is not correct when we do
+ // split and transfer ownership across operators.
+ Assert.assertTrue(sizer.netSize() <= expectedBatchSize);
+ }
+
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throws clause above.
if (schema == null) {
@@ -367,6 +380,7 @@ public class DrillTestWrapper {
}
logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
totalRecords += loader.getRecordCount();
+
for (VectorWrapper<?> w : loader) {
String field = SchemaPath.getSimplePath(w.getField().getName()).toExpr();
ValueVector[] vectors;
@@ -420,6 +434,16 @@ public class DrillTestWrapper {
}
}
}
+
+ if (expectedNumBatches != null) {
+ // Based on how much memory is actually taken by value vectors (because of doubling stuff),
+ // we have to do complex math for predicting exact number of batches.
+ // Instead, check that number of batches is at least the minimum that is expected
+ // and no more than twice of that.
+ Assert.assertTrue(numBatch >= expectedNumBatches);
+ Assert.assertTrue(numBatch <= (2*expectedNumBatches));
+ }
+
return numBatch;
}
@@ -539,7 +563,7 @@ public class DrillTestWrapper {
addTypeInfoIfMissing(actual.get(0), testBuilder);
BatchIterator batchIter = new BatchIterator(actual, loader);
- actualSuperVectors = addToCombinedVectorResults(batchIter);
+ actualSuperVectors = addToCombinedVectorResults(batchIter, null, null);
batchIter.close();
// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
@@ -552,7 +576,7 @@ public class DrillTestWrapper {
test(baselineOptionSettingQueries);
expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
BatchIterator exBatchIter = new BatchIterator(expected, loader);
- expectedSuperVectors = addToCombinedVectorResults(exBatchIter);
+ expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null);
exBatchIter.close();
}
} else {
@@ -587,6 +611,11 @@ public class DrillTestWrapper {
public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) {
Map<String, List<Object>> ret = new TreeMap<>();
+
+ if (records == null) {
+ return ret;
+ }
+
for (String s : records.get(0).keySet()) {
ret.put(s, new ArrayList<>());
}