You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/05/19 20:37:47 UTC
[drill] 02/04: DRILL-6418: Handle Schema change in Unnest And
Lateral for unnest field / non-unnest field
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit b7d259ba9c8c2b28700c9da33bb97dd79ef04cbc
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue May 15 14:27:31 2018 -0700
DRILL-6418: Handle Schema change in Unnest And Lateral for unnest field / non-unnest field
Note: Changed Lateral to handle non-empty right batch with OK_NEW_SCHEMA
closes #1271
---
.../exec/physical/impl/join/LateralJoinBatch.java | 45 +---
.../physical/impl/project/ProjectRecordBatch.java | 2 +-
.../physical/impl/unnest/UnnestRecordBatch.java | 6 +-
.../org/apache/drill/exec/record/BatchSchema.java | 27 ++-
.../apache/drill/exec/record/VectorContainer.java | 9 +
.../impl/join/TestLateralJoinCorrectness.java | 252 ++++++++++++++++++++-
.../impl/limit/TestLimitBatchEmitOutcome.java | 3 +
.../impl/project/TestProjectEmitOutcome.java | 3 +
.../unnest/TestUnnestWithLateralCorrectness.java | 8 +-
.../drill/exec/record/MaterializedField.java | 43 +++-
.../exec/vector/complex/AbstractMapVector.java | 3 +
11 files changed, 347 insertions(+), 54 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 8ea381b..a09913f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -125,8 +125,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Left side has some records in the batch so let's process right batch
childOutcome = processRightBatch();
- // reset the left & right outcomes to OK here and send the empty batch downstream
- // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do
+ // reset the left & right outcomes to OK here and send the empty batch downstream. Non-Empty right batch with
+ // OK_NEW_SCHEMA will be handled in subsequent next call
if (childOutcome == OK_NEW_SCHEMA) {
leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
rightUpstream = OK;
@@ -344,22 +344,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
switch (leftUpstream) {
case OK_NEW_SCHEMA:
// This OK_NEW_SCHEMA is received post build schema phase and from left side
- // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting
- // up any incoming vector references in setupNewSchema. While copying the records it always work on latest
- // incoming vector.
- if (!isSchemaChanged(left.getSchema(), leftSchema)) {
- logger.warn(String.format("New schema received from left side is same as previous known left schema. " +
- "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema()));
-
- // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose
- // OK_NEW_SCHEMA outcome
- processLeftBatchInFuture = false;
- if (emptyLeftBatch) {
- continue;
- } else {
- leftUpstream = OK;
- }
- } else if (outputIndex > 0) { // can only reach here from produceOutputBatch
+ if (outputIndex > 0) { // can only reach here from produceOutputBatch
// This means there is already some records from previous join inside left batch
// So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call
processLeftBatchInFuture = true;
@@ -439,20 +424,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
// case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
//
- // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema
- // downstream and later with subsequent next() call the join output will be produced
- Preconditions.checkState(right.getRecordCount() == 0,
- "Right side batch with OK_NEW_SCHEMA is not empty");
-
- if (!isSchemaChanged(right.getSchema(), rightSchema)) {
- logger.warn(String.format("New schema received from right side is same as previous known right schema. " +
- "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s",
- rightSchema, right.getSchema()));
- continue;
- }
+ // Right batch with OK_NEW_SCHEMA can be non-empty so update the rightJoinIndex correctly and pass the
+ // new schema downstream with empty batch and later with subsequent next() call the join output will be
+ // produced
if (handleSchemaChange()) {
container.setRecordCount(0);
- rightJoinIndex = -1;
+ rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
return OK_NEW_SCHEMA;
} else {
return STOP;
@@ -637,10 +614,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
batchMemoryManager.updateOutgoingStats(outputIndex);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
- logger.debug("Number of records emitted: " + outputIndex);
- }
+
+ logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+ logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", outputIndex,
+ container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
// Update the output index for next output batch to zero
outputIndex = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index eab9007..8a88db9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -526,7 +526,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
setupNewSchemaFromInput(this.incoming);
- if (container.isSchemaChanged()) {
+ if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
container.buildSchema(SelectionVectorMode.NONE);
return true;
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index fe91fc3..ed5d91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -389,11 +389,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
final MaterializedField thisField = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]);
final MaterializedField prevField = unnestFieldMetadata;
Preconditions.checkNotNull(thisField);
- unnestFieldMetadata = thisField;
+
// isEquivalent may return false if the order of the fields has changed. This usually does not
// happen but if it does we end up throwing a spurious schema change exeption
if (prevField == null || !prevField.isEquivalent(thisField)) {
logger.debug("Schema changed");
+ // We should store the clone of MaterializedField for unnest column instead of reference. When the column is of
+ // type Map and there is change in any children field of the Map then that will update the reference variable and
+ // isEquivalent check will still return true.
+ unnestFieldMetadata = thisField.clone();
return true;
}
return false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 67598e0..f161234 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -100,8 +100,29 @@ public class BatchSchema implements Iterable<MaterializedField> {
return result;
}
- // DRILL-5525: the semantics of this method are badly broken.
- // Caveat emptor.
+ /**
+ * DRILL-5525: the semantics of this method are badly broken.
+ * Caveat emptor.
+ *
+ * This check used for detecting actual schema change inside operator record batch will not work for
+ * AbstractContainerVectors (like MapVector). In each record batch a reference to incoming batch schema is
+ * stored (let say S:{a: int}) and then equals is called on that stored reference and current incoming batch schema.
+ * Internally schema object has references to Materialized fields from vectors in container. If there is change in
+ * incoming batch schema, then the upstream will create a new ValueVector in its output container with the new
+ * detected type, which in turn will have new instance for Materialized Field. Then later a new BatchSchema object
+ * is created for this new incoming batch (let say S":{a":varchar}). The operator calling equals will have reference
+ * to old schema object (S) and hence first check will not be satisfied and then it will call equals on each of the
+ * Materialized Field (a.equals(a")). Since new materialized field is created for newly created vector the equals
+ * check on field will return false. And schema change will be detected in this case.
+ * Now consider instead of int vector there is a MapVector such that initial schema was (let say S:{a:{b:int, c:int}}
+ * and then later schema for Map field c changes, then in container Map vector will be found but later the children
+ * vector for field c will be replaced. This new schema object will be created as (S":{a:{b:int, c":varchar}}). Now
+ * when S.equals(S") is called it will eventually call a.equals(a) which will return true even though the schema of
+ * children value vector c has changed. This is because no new vector is created for field (a) and hence it's object
+ * reference to MaterializedField has not changed which will be reflected in both old and new schema instances.
+ * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} method instead since
+ * {@link MaterializedField#isEquivalent(MaterializedField)} method is updated to remove the reference check.
+ */
@Override
public boolean equals(Object obj) {
@@ -151,7 +172,7 @@ public class BatchSchema implements Iterable<MaterializedField> {
/**
* Compare that two schemas are identical according to the rules defined
- * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In particular,
+ * in {@link MaterializedField#isEquivalent(MaterializedField)}. In particular,
* this method requires that the fields have a 1:1 ordered correspondence
* in the two schemas.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index e35bb5f..0ea23f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -135,6 +135,15 @@ public class VectorContainer implements VectorAccessible {
return addOrGet(field, null);
}
+ /**
+ * This method should be called with MaterializedField which also has correct children field list specially when
+ * the field type is MAP. Otherwise after calling this method if caller is not creating TransferPair on the
+ * ValueVector, then the new ValueVector will not have information about it's list of children MaterializedField.
+ * @param field
+ * @param callBack
+ * @param <T>
+ * @return
+ */
@SuppressWarnings("unchecked")
public <T extends ValueVector> T addOrGet(final MaterializedField field, final SchemaChangeCallBack callBack) {
final TypedFieldId id = getValueVectorId(SchemaPath.getSimplePath(field.getName()));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 51df6e4..e9e9aac 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.List;
import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(OperatorTest.class)
@@ -853,8 +854,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
}
/**
- * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that
- * correctly and suppresses schema change operation by producing output in same batch created with initial schema.
+ * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL rebuilds the
+ * schema each time and sends output in multiple output batches
* The schema change was only for columns which are not produced by the UNNEST or right branch.
*
* @throws Exception
@@ -904,6 +905,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
assertTrue(totalRecordCount ==
(nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -922,9 +925,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
}
/**
- * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that
- * correctly and suppresses false schema change indication from both left and right branch. It produces output in
- * same batch created with initial schema.
+ * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL correctly
+ * handles it by re-creating the schema and producing multiple batches of final output
* The schema change is for columns common on both left and right side.
*
* @throws Exception
@@ -976,6 +978,9 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
assertTrue(totalRecordCount ==
(nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -2560,4 +2565,241 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
rightMockBatch.close();
}
}
+
+ /**
+ * Verifies that if a non-empty batch with OK_NEW_SCHEMA is received from right side post buildSchema phase then it
+ * is handled correctly by sending an empty batch with OK_NEW_SCHEMA and later consuming it to produce actual
+ * output batch with some data
+ */
+ @Test
+ public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Exception {
+ // Create left input schema 2
+ TupleMetadata leftSchema2 = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.VARCHAR)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ // Create right input schema
+ TupleMetadata rightSchema2 = new SchemaBuilder()
+ .add("id_right", TypeProtos.MinorType.INT)
+ .add("cost_right", TypeProtos.MinorType.VARCHAR)
+ .add("name_right", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+
+ // Create data for left input
+ final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+ .addRow(2, "20", "item20")
+ .build();
+
+ // Create data for right input
+ final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
+ .addRow(4, "41", "item41")
+ .addRow(5, "51", "item51")
+ .build();
+
+ // Get the left container with dummy data for Lateral Join
+ leftContainer.add(nonEmptyLeftRowSet.container());
+ leftContainer.add(leftRowSet2.container());
+
+ // Get the left IterOutcomes for Lateral Join
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ // Create Left MockRecordBatch
+ final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+ // Get the right container with dummy data
+ // first OK_NEW_SCHEMA batch
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet2.container()); // non-empty OK_NEW_SCHEMA batch
+ rightContainer.add(emptyRightRowSet2.container());
+
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+ final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+ leftMockBatch, rightMockBatch);
+
+ try {
+ int totalRecordCount = 0;
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
+ // This means 2 output record batches were received because of Schema change
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertEquals(0, ljBatch.getRecordCount());
+ totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(totalRecordCount ==
+ (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
+ leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
+
+ assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+ } catch (AssertionError | Exception error) {
+ fail();
+ } finally {
+ // Close all the resources for this test case
+ ljBatch.close();
+ leftMockBatch.close();
+ rightMockBatch.close();
+ leftRowSet2.clear();
+ emptyRightRowSet2.clear();
+ nonEmptyRightRowSet2.clear();
+ }
+ }
+
+ /**
+ * Test to verify in case of Multilevel lateral when a non-empty OK_NEW_SCHEMA batch post build schema phase is
+ * received from right most UNNEST of lower LATERAL then pipeline works fine.
+ * @throws Exception
+ */
+ @Test
+ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws Exception {
+ // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
+
+ // Create left input schema for first batch
+ TupleMetadata leftSchema2 = new SchemaBuilder()
+ .add("id_left_new", TypeProtos.MinorType.INT)
+ .add("cost_left_new", TypeProtos.MinorType.INT)
+ .add("name_left_new", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build();
+ final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2)
+ .addRow(6, 60, "item6")
+ .build();
+
+ leftContainer.add(emptyLeftRowSet.container());
+ leftContainer.add(nonEmptyLeftRowSet.container());
+ leftContainer.add(emptyLeftRowSet_leftSchema2.container());
+ leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());
+
+ // Get the left IterOutcomes for Lateral Join
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+ // Get the right container with dummy data
+ TupleMetadata rightSchema2 = new SchemaBuilder()
+ .add("id_right_new", TypeProtos.MinorType.INT)
+ .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
+ .add("name_right_new", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
+ final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
+ .addRow(5, "51", "item51")
+ .addRow(6, "61", "item61")
+ .addRow(7, "71", "item71")
+ .build();
+
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet_rightSchema2.container()); // non-empty batch with Ok_new_schema
+ rightContainer.add(emptyRightRowSet_rightSchema2.container());
+
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+
+ final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+ leftMockBatch_1, rightMockBatch_1);
+
+ // ** Prepare second pair of left and right batch for upper level Lateral_2 **
+
+ // Create left input schema for first batch
+ TupleMetadata leftSchema3 = new SchemaBuilder()
+ .add("id_left_left", TypeProtos.MinorType.INT)
+ .add("cost_left_left", TypeProtos.MinorType.INT)
+ .add("name_left_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build();
+ final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
+ .addRow(6, 60, "item6")
+ .build();
+
+ // Get left input schema for second left batch
+ TupleMetadata leftSchema4 = new SchemaBuilder()
+ .add("id_left_left_new", TypeProtos.MinorType.INT)
+ .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR)
+ .add("name_left_left_new", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4)
+ .addRow(100, "100", "item100")
+ .build();
+
+ // Build Left container for upper level LATERAL operator
+ final List<VectorContainer> leftContainer2 = new ArrayList<>(5);
+
+ // Get the left container with dummy data
+ leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
+ leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
+ leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());
+
+ // Get the left container outcomes for upper level LATERAL operator
+ final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
+ leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes2.add(RecordBatch.IterOutcome.OK);
+ leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());
+
+ final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+ leftMockBatch_2, lowerLevelLateral);
+
+ try {
+ // 3 for first batch on left side and another 3 for next left batch
+ final int expectedOutputRecordCount = 6;
+ int actualOutputRecordCount = 0;
+
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+ assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
+ assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
+ } catch (AssertionError | Exception error) {
+ fail();
+ } finally {
+ // Close all the resources for this test case
+ upperLevelLateral.close();
+ leftMockBatch_2.close();
+ lowerLevelLateral.close();
+ leftMockBatch_1.close();
+ rightMockBatch_1.close();
+ leftContainer2.clear();
+ leftOutcomes2.clear();
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
index 4757488..38c0f51 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -17,16 +17,19 @@
*/
package org.apache.drill.exec.physical.impl.limit;
+import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.test.rowSet.RowSet;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@Category(OperatorTest.class)
public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
index b3099d0..cc737e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -17,17 +17,20 @@
*/
package org.apache.drill.exec.physical.impl.project;
+import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.test.rowSet.RowSet;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@Category(OperatorTest.class)
public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index ec043b2..70a32f8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -223,8 +223,12 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
Object[][][] baseline = {
{
- {1, 1, 2, 2, 2, 3, 3, 4},
- {"0", "1", "2", "3", "4", "5", "6", "9"}
+ {1, 1, 2, 2, 2, 3, 3},
+ {"0", "1", "2", "3", "4", "5", "6"}
+ },
+ {
+ {4},
+ {"9"}
}
};
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index fa4d276..672bb7e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -111,7 +111,7 @@ public class MaterializedField {
* <p>
* By allowing the non-critical metadata to change, we preserve the
* child relationships as a list or union evolves.
- * @param type
+ * @param newType
*/
public void replaceType(MajorType newType) {
@@ -190,11 +190,20 @@ public class MaterializedField {
return Objects.hash(this.name, this.type, this.children);
}
+ /**
+ * Equals method doesn't check for the children list of fields here. When a batch is sent over network then it is
+ * serialized along with the Materialized Field which also contains information about the internal vectors like
+ * offset and bits. While deserializing, these vectors are treated as children of parent vector. If a operator on
+ * receiver side like Sort receives a schema in buildSchema phase and then later on receives another batch, that
+ * will result in schema change and query will fail. This is because second batch schema will contain information
+ * about internal vectors like offset and bits which will not be present in first batch schema. For ref: See
+ * TestSort#testSortWithRepeatedMapWithExchanges
+ *
+ * @param obj
+ * @return
+ */
@Override
public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
if (obj == null) {
return false;
}
@@ -206,7 +215,7 @@ public class MaterializedField {
// in MapVector$MapTransferPair
return this.name.equalsIgnoreCase(other.name) &&
- Objects.equals(this.type, other.type);
+ Objects.equals(this.type, other.type);
}
/**
@@ -230,6 +239,27 @@ public class MaterializedField {
* sense.) Operators that want to reconcile two maps that differ only in
* column order need a different comparison.</li>
* </ul>
+ * <ul>
+ * Note: Materialized Field and ValueVector has 1:1 mapping which means for each ValueVector there is a materialized
+ * field associated with it. So when we replace or add a ValueVector in a VectorContainer then we create new
+ * Materialized Field object for the new vector. This works fine for Primitive type ValueVectors but for ValueVector
+ * which are of type {@link org.apache.drill.exec.vector.complex.AbstractContainerVector} there is some differences on
+ * how Materialized field and ValueVector objects are updated inside the container which both ValueVector and
+ * Materialized Field object both mutable.
+ * <p>
+ * For example: For cases of MapVector it can so happen that only the children field type changed but
+ * the parent Map type and name remained same. In these cases we replace the children field ValueVector from parent
+ * MapVector inside main batch container, with new type of vector. Thus the reference of parent MaprVector inside
+ * batch container remains same but the reference of children field ValueVector stored inside MapVector get's updated.
+ * During this update it also replaces the Materialized field for that children field which is stored in childrens
+ * list of the parent MapVector Materialized Field.
+ * Since the children list of parent Materialized Field is updated, this make this class mutable. Hence there should
+ * not be any check for object reference equality here but instead there should be deep comparison which is what
+ * this method is now performing. Since if we have object reference check then in above cases it will return true for
+ * 2 Materialized Field object whose children field list is different which is not correct. Same holds true for
+ * {@link MaterializedField#isEquivalent(MaterializedField)} method.
+ * </p>
+ * </ul>
*
* @param other another field
* @return <tt>true</tt> if the columns are identical according to the
@@ -237,9 +267,6 @@ public class MaterializedField {
*/
public boolean isEquivalent(MaterializedField other) {
- if (this == other) {
- return true;
- }
if (! name.equalsIgnoreCase(other.name)) {
return false;
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 3682397..1d0e03b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -129,6 +129,9 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
return (T) existing;
} else if (nullFilled(existing)) {
existing.clear();
+ // Since it's removing old vector and adding new one based on new type, it should do same for Materialized field,
+ // Otherwise there will be duplicate of same field with same name but different type.
+ field.removeChild(existing.getField());
create = true;
}
if (create) {
--
To stop receiving notification emails like this one, please contact
arina@apache.org.