You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/02 22:52:11 UTC
[drill] 03/03: DRILL-6561: Lateral excluding the columns from
output container provided by projection push into rules
This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 069c3049f1a500e5ae0b47caeebc5856ab182b73
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Fri Jun 29 10:27:55 2018 -0700
DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules
This closes #1356
---
.../exec/physical/impl/join/HashJoinBatch.java | 7 +-
.../exec/physical/impl/join/LateralJoinBatch.java | 71 +++++++--
.../exec/physical/impl/join/MergeJoinBatch.java | 3 +-
.../drill/exec/record/JoinBatchMemoryManager.java | 19 ++-
.../impl/join/TestLateralJoinCorrectness.java | 160 ++++++++++++++++++++-
.../unnest/TestUnnestWithLateralCorrectness.java | 104 +++++++++-----
6 files changed, 310 insertions(+), 54 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 047c597..345d182 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -67,9 +68,6 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
-
/**
* This class implements the runtime execution for the Hash-Join operator
* supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -892,7 +890,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
- batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
+ batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
+ logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 84dc5c3..fc3c8b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
@@ -34,10 +36,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
+import java.util.HashSet;
+import java.util.List;
+
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -82,6 +88,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Flag to keep track of new left batch so that update on memory manager is called only once per left batch
private boolean isNewLeftBatch = false;
+ private final HashSet<String> excludedFieldNames = new HashSet<>();
+
/* ****************************************************************************************************************
* Public Methods
* ****************************************************************************************************************/
@@ -91,7 +99,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
Preconditions.checkNotNull(left);
Preconditions.checkNotNull(right);
final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
- batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
+ // Prepare Schema Path Mapping
+ populateExcludedField(popConfig);
+ batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);
// Initially it's set to default value of 64K and later for each new output row it will be set to the computed
// row count
@@ -700,6 +710,21 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
return isValid;
}
+ private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
+ if (excludedFieldNames.size() == 0) {
+ return originSchema;
+ }
+
+ final SchemaBuilder newSchemaBuilder =
+ BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
+ for (MaterializedField field : originSchema) {
+ if (!excludedFieldNames.contains(field.getName())) {
+ newSchemaBuilder.addField(field);
+ }
+ }
+ return newSchemaBuilder.build();
+ }
+
/**
* Helps to create the outgoing container vectors based on known left and right batch schemas
* @throws SchemaChangeException
@@ -711,8 +736,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Clear up the container
container.clear();
- leftSchema = left.getSchema();
- rightSchema = right.getSchema();
+ leftSchema = batchSchemaWithNoExcludedCols(left.getSchema());
+ rightSchema = batchSchemaWithNoExcludedCols(right.getSchema());
if (!verifyInputSchema(leftSchema)) {
throw new SchemaChangeException("Invalid Schema found for left incoming batch");
@@ -724,12 +749,20 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Setup LeftSchema in outgoing container
for (final VectorWrapper<?> vectorWrapper : left) {
- container.addOrGet(vectorWrapper.getField());
+ final MaterializedField leftField = vectorWrapper.getField();
+ if (excludedFieldNames.contains(leftField.getName())) {
+ continue;
+ }
+ container.addOrGet(leftField);
}
// Setup RightSchema in the outgoing container
for (final VectorWrapper<?> vectorWrapper : right) {
MaterializedField rightField = vectorWrapper.getField();
+ if (excludedFieldNames.contains(rightField.getName())) {
+ continue;
+ }
+
TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();
// make right input schema optional if we have LEFT join
@@ -846,15 +879,28 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Get the vectors using field index rather than Materialized field since input batch field can be different from
// output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
// container.
+ int inputIndex = 0;
for (int i = startVectorIndex; i < endVectorIndex; ++i) {
- // Get input vector
- final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass();
- final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector();
-
// Get output vector
final int outputVectorIndex = i + baseVectorIndex;
final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
+ final String outputFieldName = outputVector.getField().getName();
+
+ ValueVector inputVector;
+ Class<?> inputValueClass;
+ String inputFieldName;
+ do {
+ // Get input vector
+ inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
+ inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
+ inputFieldName = inputVector.getField().getName();
+ ++inputIndex;
+ } while (excludedFieldNames.contains(inputFieldName));
+
+ Preconditions.checkArgument(outputFieldName.equals(inputFieldName),
+ new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" +
+ ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema())));
logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
"(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
@@ -938,4 +984,13 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
maxOutputRowCount = newOutputRowCount;
}
}
+
+ private void populateExcludedField(PhysicalOperator lateralPop) {
+ final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
+ if (excludedCols != null) {
+ for (SchemaPath currentPath : excludedCols) {
+ excludedFieldNames.add(currentPath.rootName());
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 62967a9..ea34ed9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -62,6 +62,7 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
@@ -108,7 +109,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
- super(outputBatchSize, leftBatch, rightBatch);
+ super(outputBatchSize, leftBatch, rightBatch, new HashSet<>());
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 2ebe887..4344e13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -17,29 +17,44 @@
*/
package org.apache.drill.exec.record;
+import java.util.Set;
+
public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
private int rowWidth[];
private RecordBatch recordBatch[];
+ private Set<String> columnsToExclude;
private static final int numInputs = 2;
public static final int LEFT_INDEX = 0;
public static final int RIGHT_INDEX = 1;
- public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
+ public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch,
+ RecordBatch rightBatch, Set<String> excludedColumns) {
super(numInputs, outputBatchSize);
recordBatch = new RecordBatch[numInputs];
recordBatch[LEFT_INDEX] = leftBatch;
recordBatch[RIGHT_INDEX] = rightBatch;
rowWidth = new int[numInputs];
+ this.columnsToExclude = excludedColumns;
}
private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
updateIncomingStats(inputIndex);
rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
- final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
+ // Reduce the width of excluded columns from actual rowWidth
+ for (String columnName : columnsToExclude) {
+ final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName);
+ if (currentColSizer == null) {
+ continue;
+ }
+ rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry();
+ }
+
+ // Get final net outgoing row width after reducing the excluded columns width
+ int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
// If outgoing row width is 0 or there is no change in outgoing row width, just return.
// This is possible for empty batches or
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 2723e30..ffac4b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -21,6 +21,7 @@ import avro.shaded.com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
@@ -29,11 +30,13 @@ import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.DirectRowSet;
import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
import org.junit.After;
import org.junit.AfterClass;
@@ -2870,4 +2873,159 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
leftRowSet2.clear();
}
}
+
+ private void testExcludedColumns(List<SchemaPath> excludedCols, CloseableRecordBatch left,
+ CloseableRecordBatch right, RowSet expectedRowSet) throws Exception {
+ LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+ final LateralJoinBatch ljBatch = new LateralJoinBatch(lateralPop, fixture.getFragmentContext(), left, right);
+
+ try {
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+ RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+ assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+ } finally {
+ ljBatch.close();
+ left.close();
+ right.close();
+ expectedRowSet.clear();
+ }
+ }
+
+ @Test
+ public void testFillingUpOutputBatch_WithExcludedColumns() throws Exception {
+ // Create data for left input
+ final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+ .addRow(2, 20, "item20")
+ .build();
+
+ // Create data for right input
+ final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+ .addRow(4, 41, "item41")
+ .addRow(5, 51, "item51")
+ .build();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .add("id_right", TypeProtos.MinorType.INT)
+ .add("cost_right", TypeProtos.MinorType.INT)
+ .add("name_right", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+ .addRow(1, "item1", 1, 11, "item11")
+ .addRow(1, "item1", 2, 21, "item21")
+ .addRow(1, "item1", 3, 31, "item31")
+ .addRow(2, "item20", 4, 41, "item41")
+ .addRow(2, "item20", 5, 51, "item51")
+ .build();
+
+ // Get the left container with dummy data for Lateral Join
+ leftContainer.add(nonEmptyLeftRowSet.container());
+ leftContainer.add(leftRowSet2.container());
+
+ // Get the left IterOutcomes for Lateral Join
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ // Create Left MockRecordBatch
+ final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+ // Get the right container with dummy data
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet2.container());
+
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+ List<SchemaPath> excludedCols = new ArrayList<>();
+ excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+
+ try {
+ testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+ } finally {
+ // Close all the resources for this test case
+ leftRowSet2.clear();
+ nonEmptyRightRowSet2.clear();
+ }
+ }
+
+ @Test
+ public void testFillingUpOutputBatch_With2ExcludedColumns() throws Exception {
+ // Create data for left input
+ final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+ .addRow(2, 20, "item20")
+ .build();
+
+ // Create data for right input
+ final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+ .addRow(4, 41, "item41")
+ .addRow(5, 51, "item51")
+ .build();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ //.add("id_right", TypeProtos.MinorType.INT)
+ .add("cost_right", TypeProtos.MinorType.INT)
+ .add("name_right", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+ /*.addRow("item1", 1, 11, "item11")
+ .addRow("item1", 2, 21, "item21")
+ .addRow("item1", 3, 31, "item31")
+ .addRow("item20", 4, 41, "item41")
+ .addRow("item20", 5, 51, "item51") */
+ .addRow("item1", 11, "item11")
+ .addRow("item1", 21, "item21")
+ .addRow("item1", 31, "item31")
+ .addRow("item20", 41, "item41")
+ .addRow("item20", 51, "item51")
+ .build();
+
+ // Get the left container with dummy data for Lateral Join
+ leftContainer.add(nonEmptyLeftRowSet.container());
+ leftContainer.add(leftRowSet2.container());
+
+ // Get the left IterOutcomes for Lateral Join
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ // Create Left MockRecordBatch
+ final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+ // Get the right container with dummy data
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet2.container());
+
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+ List<SchemaPath> excludedCols = new ArrayList<>();
+ excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+ excludedCols.add(SchemaPath.getSimplePath("id_left"));
+ excludedCols.add(SchemaPath.getSimplePath("id_right"));
+
+ try {
+ testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+ } finally {
+ // Close all the resources for this test case
+ leftRowSet2.clear();
+ nonEmptyRightRowSet2.clear();
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 3a7f899..c2e64f4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -37,8 +37,8 @@ import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.planner.logical.DrillLogicalTestutils;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarCharVector;
@@ -106,7 +106,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
@@ -140,7 +140,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
@@ -161,7 +161,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
@@ -192,7 +192,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
@@ -240,7 +240,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
RecordBatch.IterOutcome.OK_NEW_SCHEMA};
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
@@ -289,28 +289,15 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
RecordBatch.IterOutcome.OK_NEW_SCHEMA};
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
}
- @Test
- public void testUnnestLimitBatchSize() {
-
- final int limitedOutputBatchSize = 127;
- final int inputBatchSize = limitedOutputBatchSize + 1;
- // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
- // Lateral output batch size = N * input row size + N * size of single unnest column
- // = N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
- // + N * 4
- // = N * (4 + 2*4 + (N+1)*4 ) + N * 4
- // = N * (16 + 4N) + N * 4
- // = 4N * (N + 5)
- // configure the output batch size to be one more record than that so that the batch sizer can round down
- final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
-
+ private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize,
+ int limitOutputBatchSizeBytes, boolean excludeUnnestColumn) {
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
@@ -323,39 +310,76 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
}
}
}
+
Integer[][][] baseline = new Integer[2][2][];
- baseline[0][0] = new Integer[limitedOutputBatchSize];
- baseline[0][1] = new Integer[limitedOutputBatchSize];
+ baseline[0][0] = new Integer[limitOutputBatchSize];
+ baseline[0][1] = new Integer[limitOutputBatchSize];
baseline[1][0] = new Integer[1];
baseline[1][1] = new Integer[1];
- for (int i = 0; i < limitedOutputBatchSize; i++) {
+ for (int i = 0; i < limitOutputBatchSize; i++) {
baseline[0][0][i] = 1;
baseline[0][1][i] = i;
}
baseline[1][0][0] = 1; // row Num
- baseline[1][1][0] = limitedOutputBatchSize; // value
+ baseline[1][1][0] = limitOutputBatchSize; // value
// Create input schema
TupleMetadata incomingSchema = new SchemaBuilder()
- .add("rowNumber", TypeProtos.MinorType.INT)
- .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+ .add("rowNumber", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema};
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants
- .OUTPUT_BATCH_SIZE_VALIDATOR);
- fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
+ .OUTPUT_BATCH_SIZE_VALIDATOR);
+ fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitOutputBatchSizeBytes);
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, excludeUnnestColumn);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
} finally {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
}
+ }
+ @Test
+ public void testUnnestLimitBatchSize_WithExcludedCols() {
+ LateralJoinPOP previoudPop = ljPopConfig;
+ List<SchemaPath> excludedCols = new ArrayList<>();
+ excludedCols.add(SchemaPath.getSimplePath("unnestColumn"));
+ ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ // Since we want 127 row count and because of nearest power of 2 adjustment output row count will be reduced to
+ // 64. So we should configure batch size for (N+1) rows if we want to output N rows where N is not power of 2
+ // size of lateral output batch = (N+1)*8 bytes, where N = output batch row count
+ // Lateral output batch size = (N+1) * (input row size without unnest field) + (N+1) * size of single unnest column
+ // = (N+1) * (size of row id) + (N+1) * (size of single array entry)
+ // = (N+1)*4 + (N+1) * 4
+ // = (N+1) * 8
+ // configure the output batch size to be one more record than that so that the batch sizer can round down
+ final int limitedOutputBatchSizeBytes = 8 * (limitedOutputBatchSize + 1);
+ testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, true);
+ ljPopConfig = previoudPop;
+ }
+
+ @Test
+ public void testUnnestLimitBatchSize() {
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
+ // Lateral output batch size = N * input row size + N * size of single unnest column
+ // = N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
+ // + N * 4
+ // = N * (4 + 2*4 + (N+1)*4 ) + N * 4
+ // = N * (16 + 4N) + N * 4
+ // = 4N * (N + 5)
+ // configure the output batch size to be one more record than that so that the batch sizer can round down
+ final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
+ testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, false);
}
@Test
@@ -405,7 +429,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
try {
- testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+ testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
} finally {
@@ -463,7 +487,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
try {
- testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+ testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
} finally {
@@ -496,7 +520,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
- testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (UserException|UnsupportedOperationException e) {
return; // succeeded
} catch (Exception e) {
@@ -511,8 +535,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
TupleMetadata[] incomingSchemas,
RecordBatch.IterOutcome[] iterOutcomes,
T[][] data,
- T[][][] baseline ) throws Exception{
- testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline);
+ T[][][] baseline,
+ boolean excludeUnnestColumn) throws Exception{
+ testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline, excludeUnnestColumn);
}
// test unnest for various input conditions optionally invoking kill. if the kill or killBatch
@@ -522,7 +547,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
int unnestLimit, // kill unnest after every 'unnestLimit' number of values in every record
int execKill, // number of batches after which to kill the execution (!)
T[][] data,
- T[][][] baseline) throws Exception {
+ T[][][] baseline,
+ boolean excludeUnnestColumn) throws Exception {
// Get the incoming container with dummy data for LJ
final List<VectorContainer> incomingContainer = new ArrayList<>(data.length);
@@ -606,7 +632,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
//int valueIndex = 0;
for ( List<ValueVector> batch: results) {
int vectorCount= batch.size();
- if (vectorCount!= baseline[batchIndex].length+1) { // baseline does not include the original unnest column
+ int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1;
+ expectedVectorCount += baseline[batchIndex].length;
+ if (vectorCount!= expectedVectorCount) { // baseline does not include the original unnest column
fail("Test failed in validating unnest output. Batch column count mismatch.");
}
for (ValueVector vv : batch) {