You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/05/19 20:37:47 UTC

[drill] 02/04: DRILL-6418: Handle Schema change in Unnest And Lateral for unnest field / non-unnest field

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b7d259ba9c8c2b28700c9da33bb97dd79ef04cbc
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue May 15 14:27:31 2018 -0700

    DRILL-6418: Handle Schema change in Unnest And Lateral for unnest field / non-unnest field
    
    Note: Changed Lateral to handle non-empty right batch with OK_NEW_SCHEMA
    
    closes #1271
---
 .../exec/physical/impl/join/LateralJoinBatch.java  |  45 +---
 .../physical/impl/project/ProjectRecordBatch.java  |   2 +-
 .../physical/impl/unnest/UnnestRecordBatch.java    |   6 +-
 .../org/apache/drill/exec/record/BatchSchema.java  |  27 ++-
 .../apache/drill/exec/record/VectorContainer.java  |   9 +
 .../impl/join/TestLateralJoinCorrectness.java      | 252 ++++++++++++++++++++-
 .../impl/limit/TestLimitBatchEmitOutcome.java      |   3 +
 .../impl/project/TestProjectEmitOutcome.java       |   3 +
 .../unnest/TestUnnestWithLateralCorrectness.java   |   8 +-
 .../drill/exec/record/MaterializedField.java       |  43 +++-
 .../exec/vector/complex/AbstractMapVector.java     |   3 +
 11 files changed, 347 insertions(+), 54 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 8ea381b..a09913f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -125,8 +125,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     // Left side has some records in the batch so let's process right batch
     childOutcome = processRightBatch();
 
-    // reset the left & right outcomes to OK here and send the empty batch downstream
-    // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do
+    // reset the left & right outcomes to OK here and send the empty batch downstream. Non-Empty right batch with
+    // OK_NEW_SCHEMA will be handled in subsequent next call
     if (childOutcome == OK_NEW_SCHEMA) {
       leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
       rightUpstream = OK;
@@ -344,22 +344,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       switch (leftUpstream) {
         case OK_NEW_SCHEMA:
           // This OK_NEW_SCHEMA is received post build schema phase and from left side
-          // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting
-          // up any incoming vector references in setupNewSchema. While copying the records it always work on latest
-          // incoming vector.
-          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
-            logger.warn(String.format("New schema received from left side is same as previous known left schema. " +
-              "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema()));
-
-            // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose
-            // OK_NEW_SCHEMA outcome
-            processLeftBatchInFuture = false;
-            if (emptyLeftBatch) {
-              continue;
-            } else {
-              leftUpstream = OK;
-            }
-          } else if (outputIndex > 0) { // can only reach here from produceOutputBatch
+          if (outputIndex > 0) { // can only reach here from produceOutputBatch
             // This means there is already some records from previous join inside left batch
             // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call
             processLeftBatchInFuture = true;
@@ -439,20 +424,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
           // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
           // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
           //
-          // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema
-          // downstream and later with subsequent next() call the join output will be produced
-          Preconditions.checkState(right.getRecordCount() == 0,
-            "Right side batch with OK_NEW_SCHEMA is not empty");
-
-          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
-            logger.warn(String.format("New schema received from right side is same as previous known right schema. " +
-              "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s",
-              rightSchema, right.getSchema()));
-            continue;
-          }
+          // Right batch with OK_NEW_SCHEMA can be non-empty so update the rightJoinIndex correctly and pass the
+          // new schema downstream with empty batch and later with subsequent next() call the join output will be
+          // produced
           if (handleSchemaChange()) {
             container.setRecordCount(0);
-            rightJoinIndex = -1;
+            rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
             return OK_NEW_SCHEMA;
           } else {
             return STOP;
@@ -637,10 +614,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-      logger.debug("Number of records emitted: " + outputIndex);
-    }
+
+    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+    logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", outputIndex,
+      container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
 
     // Update the output index for next output batch to zero
     outputIndex = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index eab9007..8a88db9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -526,7 +526,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     setupNewSchemaFromInput(this.incoming);
-    if (container.isSchemaChanged()) {
+    if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
       container.buildSchema(SelectionVectorMode.NONE);
       return true;
     } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index fe91fc3..ed5d91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -389,11 +389,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     final MaterializedField thisField = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]);
     final MaterializedField prevField = unnestFieldMetadata;
     Preconditions.checkNotNull(thisField);
-    unnestFieldMetadata = thisField;
+
     // isEquivalent may return false if the order of the fields has changed. This usually does not
     // happen but if it does we end up throwing a spurious schema change exeption
     if (prevField == null || !prevField.isEquivalent(thisField)) {
       logger.debug("Schema changed");
+      // We should store the clone of MaterializedField for unnest column instead of reference. When the column is of
+      // type Map and there is change in any children field of the Map then that will update the reference variable and
+      // isEquivalent check will still return true.
+      unnestFieldMetadata = thisField.clone();
       return true;
     }
     return false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 67598e0..f161234 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -100,8 +100,29 @@ public class BatchSchema implements Iterable<MaterializedField> {
     return result;
   }
 
-  // DRILL-5525: the semantics of this method are badly broken.
-  // Caveat emptor.
+  /**
+   * DRILL-5525: the semantics of this method are badly broken.
+   * Caveat emptor.
+   *
+   * This check used for detecting actual schema change inside operator record batch will not work for
+   * AbstractContainerVectors (like MapVector). In each record batch a reference to incoming batch schema is
+   * stored (let say S:{a: int}) and then equals is called on that stored reference and current incoming batch schema.
+   * Internally schema object has references to Materialized fields from vectors in container. If there is change in
+   * incoming batch schema, then the upstream will create a new ValueVector in its output container with the new
+   * detected type, which in turn will have new instance for Materialized Field. Then later a new BatchSchema object
+   * is created for this new incoming batch (let say S":{a":varchar}). The operator calling equals will have reference
+   * to old schema object (S) and hence first check will not be satisfied and then it will call equals on each of the
+   * Materialized Field (a.equals(a")). Since new materialized field is created for newly created vector the equals
+   * check on field will return false. And schema change will be detected in this case.
+   * Now consider instead of int vector there is a MapVector such that initial schema was (let say S:{a:{b:int, c:int}}
+   * and then later schema for Map field c changes, then in container Map vector will be found but later the children
+   * vector for field c will be replaced. This new schema object will be created as (S":{a:{b:int, c":varchar}}). Now
+   * when S.equals(S") is called it will eventually call a.equals(a) which will return true even though the schema of
+   * children value vector c has changed. This is because no new vector is created for field (a) and hence it's object
+   * reference to MaterializedField has not changed which will be reflected in both old and new schema instances.
+   * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} method instead since
+   * {@link MaterializedField#isEquivalent(MaterializedField)} method is updated to remove the reference check.
+   */
 
   @Override
   public boolean equals(Object obj) {
@@ -151,7 +172,7 @@ public class BatchSchema implements Iterable<MaterializedField> {
 
   /**
    * Compare that two schemas are identical according to the rules defined
-   * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In particular,
+   * in {@link MaterializedField#isEquivalent(MaterializedField)}. In particular,
    * this method requires that the fields have a 1:1 ordered correspondence
    * in the two schemas.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index e35bb5f..0ea23f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -135,6 +135,15 @@ public class VectorContainer implements VectorAccessible {
     return addOrGet(field, null);
   }
 
+  /**
+   * This method should be called with MaterializedField which also has correct children field list specially when
+   * the field type is MAP. Otherwise after calling this method if caller is not creating TransferPair on the
+   * ValueVector, then the new ValueVector will not have information about it's list of children MaterializedField.
+   * @param field
+   * @param callBack
+   * @param <T>
+   * @return
+   */
   @SuppressWarnings("unchecked")
   public <T extends ValueVector> T addOrGet(final MaterializedField field, final SchemaChangeCallBack callBack) {
     final TypedFieldId id = getValueVectorId(SchemaPath.getSimplePath(field.getName()));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 51df6e4..e9e9aac 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @Category(OperatorTest.class)
@@ -853,8 +854,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
   }
 
   /**
-   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that
-   * correctly and suppresses schema change operation by producing output in same batch created with initial schema.
+   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL rebuilds the
+   * schema each time and sends output in multiple output batches
    * The schema change was only for columns which are not produced by the UNNEST or right branch.
    *
    * @throws Exception
@@ -904,6 +905,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
       totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
       assertTrue(totalRecordCount ==
         (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
           leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -922,9 +925,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
   }
 
   /**
-   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that
-   * correctly and suppresses false schema change indication from both left and right branch. It produces output in
-   * same batch created with initial schema.
+   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL correctly
+   * handles it by re-creating the schema and producing multiple batches of final output
    * The schema change is for columns common on both left and right side.
    *
    * @throws Exception
@@ -976,6 +978,9 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
       totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
       assertTrue(totalRecordCount ==
         (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
           leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -2560,4 +2565,241 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       rightMockBatch.close();
     }
   }
+
+  /**
+   * Verifies that if a non-empty batch with OK_NEW_SCHEMA is received from right side post buildSchema phase then it
+   * is handled correctly by sending an empty batch with OK_NEW_SCHEMA and later consuming it to produce actual
+   * output batch with some data
+   */
+  @Test
+  public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Exception {
+    // Create left input schema 2
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    // Create right input schema
+    TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.VARCHAR)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(2, "20", "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
+      .addRow(4, "41", "item41")
+      .addRow(5, "51", "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    // first OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container()); // non-empty OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      int totalRecordCount = 0;
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
+      // This means 2 output record batches were received because of Schema change
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertEquals(0, ljBatch.getRecordCount());
+      totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(totalRecordCount ==
+        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
+          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      leftRowSet2.clear();
+      emptyRightRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
+
+  /**
+   * Test to verify in case of Multilevel lateral when a non-empty OK_NEW_SCHEMA batch post build schema phase is
+   * received from right most UNNEST of lower LATERAL then pipeline works fine.
+   * @throws Exception
+   */
+  @Test
+  public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws Exception {
+    // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
+
+    // Create left input schema for first batch
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left_new", TypeProtos.MinorType.INT)
+      .add("cost_left_new", TypeProtos.MinorType.INT)
+      .add("name_left_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(6, 60, "item6")
+      .build();
+
+    leftContainer.add(emptyLeftRowSet.container());
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(emptyLeftRowSet_leftSchema2.container());
+    leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add("id_right_new", TypeProtos.MinorType.INT)
+      .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
+      .add("name_right_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
+    final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
+      .addRow(5, "51", "item51")
+      .addRow(6, "61", "item61")
+      .addRow(7, "71", "item71")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet_rightSchema2.container()); // non-empty batch with Ok_new_schema
+    rightContainer.add(emptyRightRowSet_rightSchema2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+
+    final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+      leftMockBatch_1, rightMockBatch_1);
+
+    // ** Prepare second pair of left and right batch for upper level Lateral_2 **
+
+    // Create left input schema for first batch
+    TupleMetadata leftSchema3 = new SchemaBuilder()
+      .add("id_left_left", TypeProtos.MinorType.INT)
+      .add("cost_left_left", TypeProtos.MinorType.INT)
+      .add("name_left_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
+      .addRow(6, 60, "item6")
+      .build();
+
+    // Get left input schema for second left batch
+    TupleMetadata leftSchema4 = new SchemaBuilder()
+      .add("id_left_left_new", TypeProtos.MinorType.INT)
+      .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR)
+      .add("name_left_left_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4)
+      .addRow(100, "100", "item100")
+      .build();
+
+    // Build Left container for upper level LATERAL operator
+    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);
+
+    // Get the left container with dummy data
+    leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
+    leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
+    leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());
+
+    // Get the left container outcomes for upper level LATERAL operator
+    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());
+
+    final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+      leftMockBatch_2, lowerLevelLateral);
+
+    try {
+      // 3 for first batch on left side and another 3 for next left batch
+      final int expectedOutputRecordCount = 6;
+      int actualOutputRecordCount = 0;
+
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
+      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      upperLevelLateral.close();
+      leftMockBatch_2.close();
+      lowerLevelLateral.close();
+      leftMockBatch_1.close();
+      rightMockBatch_1.close();
+      leftContainer2.clear();
+      leftOutcomes2.clear();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
index 4757488..38c0f51 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -17,16 +17,19 @@
  */
 package org.apache.drill.exec.physical.impl.limit;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.rowSet.RowSet;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(OperatorTest.class)
 public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
index b3099d0..cc737e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -17,17 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.rowSet.RowSet;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(OperatorTest.class)
 public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index ec043b2..70a32f8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -223,8 +223,12 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
 
     Object[][][] baseline = {
         {
-          {1, 1, 2, 2, 2, 3, 3, 4},
-          {"0", "1", "2", "3", "4", "5", "6", "9"}
+          {1, 1, 2, 2, 2, 3, 3},
+          {"0", "1", "2", "3", "4", "5", "6"}
+        },
+        {
+          {4},
+          {"9"}
         }
     };
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index fa4d276..672bb7e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -111,7 +111,7 @@ public class MaterializedField {
    * <p>
    * By allowing the non-critical metadata to change, we preserve the
    * child relationships as a list or union evolves.
-   * @param type
+   * @param newType
    */
 
   public void replaceType(MajorType newType) {
@@ -190,11 +190,20 @@ public class MaterializedField {
     return Objects.hash(this.name, this.type, this.children);
   }
 
+  /**
+   * Equals method doesn't check for the children list of fields here. When a batch is sent over network then it is
+   * serialized along with the Materialized Field which also contains information about the internal vectors like
+   * offset and bits. While deserializing, these vectors are treated as children of parent vector. If a operator on
+   * receiver side like Sort receives a schema in buildSchema phase and then later on receives another batch, that
+   * will result in schema change and query will fail. This is because second batch schema will contain information
+   * about internal vectors like offset and bits which will not be present in first batch schema. For ref: See
+   * TestSort#testSortWithRepeatedMapWithExchanges
+   *
+   * @param obj
+   * @return
+   */
   @Override
   public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
     if (obj == null) {
       return false;
     }
@@ -206,7 +215,7 @@ public class MaterializedField {
     // in MapVector$MapTransferPair
 
     return this.name.equalsIgnoreCase(other.name) &&
-            Objects.equals(this.type, other.type);
+      Objects.equals(this.type, other.type);
   }
 
   /**
@@ -230,6 +239,27 @@ public class MaterializedField {
    * sense.) Operators that want to reconcile two maps that differ only in
    * column order need a different comparison.</li>
    * </ul>
+   * <ul>
+   * Note: Materialized Field and ValueVector has 1:1 mapping which means for each ValueVector there is a materialized
+   * field associated with it. So when we replace or add a ValueVector in a VectorContainer then we create new
+   * Materialized Field object for the new vector. This works fine for Primitive type ValueVectors but for ValueVector
+   * which are of type {@link org.apache.drill.exec.vector.complex.AbstractContainerVector} there is some differences on
+   * how Materialized field and ValueVector objects are updated inside the container which both ValueVector and
+   * Materialized Field object both mutable.
+   * <p>
+   * For example: For cases of MapVector it can so happen that only the children field type changed but
+   * the parent Map type and name remained same. In these cases we replace the children field ValueVector from parent
+   * MapVector inside main batch container, with new type of vector. Thus the reference of parent MaprVector inside
+   * batch container remains same but the reference of children field ValueVector stored inside MapVector get's updated.
+   * During this update it also replaces the Materialized field for that children field which is stored in childrens
+   * list of the parent MapVector Materialized Field.
+   * Since the children list of parent Materialized Field is updated, this make this class mutable. Hence there should
+   * not be any check for object reference equality here but instead there should be deep comparison which is what
+   * this method is now performing. Since if we have object reference check then in above cases it will return true for
+   * 2 Materialized Field object whose children field list is different which is not correct. Same holds true for
+   * {@link MaterializedField#isEquivalent(MaterializedField)} method.
+   * </p>
+   * </ul>
    *
    * @param other another field
    * @return <tt>true</tt> if the columns are identical according to the
@@ -237,9 +267,6 @@ public class MaterializedField {
    */
 
   public boolean isEquivalent(MaterializedField other) {
-    if (this == other) {
-      return true;
-    }
     if (! name.equalsIgnoreCase(other.name)) {
       return false;
     }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 3682397..1d0e03b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -129,6 +129,9 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
       return (T) existing;
     } else if (nullFilled(existing)) {
       existing.clear();
+      // Since it's removing old vector and adding new one based on new type, it should do same for Materialized field,
+      // Otherwise there will be duplicate of same field with same name but different type.
+      field.removeChild(existing.getField());
       create = true;
     }
     if (create) {

-- 
To stop receiving notification emails like this one, please contact
arina@apache.org.