You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/02 22:52:08 UTC

[drill] branch master updated (f1a3bd1 -> 069c304)

This is an automated email from the ASF dual-hosted git repository.

parthc pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from f1a3bd1  DRILL-6310: limit batch size for hash aggregate
     new 0bdebf2  DRILL-6535: ClassCastException in Lateral Unnest queries when dealing with schema changed json data Note: The issue was happening because for a left incoming all right batches were filtered and hence outputIndex was still 0 when new left incoming came with OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating output container schema.
     new 208733b  DRILL-6530: JVM crash with a query involving multiple json files with one file having a schema change of one column from string to list
     new 069c304  DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../exec/physical/impl/join/HashJoinBatch.java     |   7 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  | 108 ++++++++--
 .../exec/physical/impl/join/MergeJoinBatch.java    |   3 +-
 .../drill/exec/record/JoinBatchMemoryManager.java  |  19 +-
 .../impl/join/TestLateralJoinCorrectness.java      | 227 ++++++++++++++++++++-
 .../unnest/TestUnnestWithLateralCorrectness.java   | 104 ++++++----
 .../src/main/codegen/templates/ListWriters.java    |  18 +-
 7 files changed, 422 insertions(+), 64 deletions(-)


[drill] 02/03: DRILL-6530: JVM crash with a query involving multiple json files with one file having a schema change of one column from string to list

Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 208733b52ec40fd49e6bd424782f7c71aabef7e3
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue Jun 26 10:53:53 2018 -0700

    DRILL-6530: JVM crash with a query involving multiple json files with one file having a schema change of one column from string to list
    
    This closes #1343
---
 .../vector/src/main/codegen/templates/ListWriters.java | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/exec/vector/src/main/codegen/templates/ListWriters.java b/exec/vector/src/main/codegen/templates/ListWriters.java
index cab8772..4300857 100644
--- a/exec/vector/src/main/codegen/templates/ListWriters.java
+++ b/exec/vector/src/main/codegen/templates/ListWriters.java
@@ -107,11 +107,13 @@ public class ${mode}ListWriter extends AbstractFieldWriter {
   public MapWriter map() {
     switch (mode) {
     case INIT:
-      int vectorCount = container.size();
+      final ValueVector oldVector = container.getChild(name);
       final RepeatedMapVector vector = container.addOrGet(name, RepeatedMapVector.TYPE, RepeatedMapVector.class);
       innerVector = vector;
       writer = new RepeatedMapWriter(vector, this);
-      if(vectorCount != container.size()) {
+      // oldVector will be null if it's first batch being created and it might not be same as newly added vector
+      // if new batch has schema change
+      if (oldVector == null || oldVector != vector) {
         writer.allocate();
       }
       writer.setPosition(${index});
@@ -131,11 +133,13 @@ public class ${mode}ListWriter extends AbstractFieldWriter {
   public ListWriter list() {
     switch (mode) {
     case INIT:
-      final int vectorCount = container.size();
+      final ValueVector oldVector = container.getChild(name);
       final RepeatedListVector vector = container.addOrGet(name, RepeatedListVector.TYPE, RepeatedListVector.class);
       innerVector = vector;
       writer = new RepeatedListWriter(null, vector, this);
-      if (vectorCount != container.size()) {
+      // oldVector will be null if it's first batch being created and it might not be same as newly added vector
+      // if new batch has schema change
+      if (oldVector == null || oldVector != vector) {
         writer.allocate();
       }
       writer.setPosition(${index});
@@ -176,11 +180,13 @@ public class ${mode}ListWriter extends AbstractFieldWriter {
   </#if>
     switch (mode) {
     case INIT:
-      final int vectorCount = container.size();
+      final ValueVector oldVector = container.getChild(name);
       final Repeated${capName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, Repeated${capName}Vector.class);
       innerVector = vector;
       writer = new Repeated${capName}WriterImpl(vector, this);
-      if(vectorCount != container.size()) {
+      // oldVector will be null if it's first batch being created and it might not be same as newly added vector
+      // if new batch has schema change
+      if (oldVector == null || oldVector != vector) {
         writer.allocate();
       }
       writer.setPosition(${index});


[drill] 03/03: DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules

Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 069c3049f1a500e5ae0b47caeebc5856ab182b73
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Fri Jun 29 10:27:55 2018 -0700

    DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules
    
    This closes #1356
---
 .../exec/physical/impl/join/HashJoinBatch.java     |   7 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  |  71 +++++++--
 .../exec/physical/impl/join/MergeJoinBatch.java    |   3 +-
 .../drill/exec/record/JoinBatchMemoryManager.java  |  19 ++-
 .../impl/join/TestLateralJoinCorrectness.java      | 160 ++++++++++++++++++++-
 .../unnest/TestUnnestWithLateralCorrectness.java   | 104 +++++++++-----
 6 files changed, 310 insertions(+), 54 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 047c597..345d182 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -67,9 +68,6 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
-
 /**
  *   This class implements the runtime execution for the Hash-Join operator
  *   supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -892,7 +890,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
       configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
 
-    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
+    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 84dc5c3..fc3c8b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -34,10 +36,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.HashSet;
+import java.util.List;
+
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -82,6 +88,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   // Flag to keep track of new left batch so that update on memory manager is called only once per left batch
   private boolean isNewLeftBatch = false;
 
+  private final HashSet<String> excludedFieldNames = new HashSet<>();
+
   /* ****************************************************************************************************************
    * Public Methods
    * ****************************************************************************************************************/
@@ -91,7 +99,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
     final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
+    // Prepare Schema Path Mapping
+    populateExcludedField(popConfig);
+    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);
 
     // Initially it's set to default value of 64K and later for each new output row it will be set to the computed
     // row count
@@ -700,6 +710,21 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     return isValid;
   }
 
+  private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
+    if (excludedFieldNames.size() == 0) {
+      return originSchema;
+    }
+
+    final SchemaBuilder newSchemaBuilder =
+      BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
+    for (MaterializedField field : originSchema) {
+      if (!excludedFieldNames.contains(field.getName())) {
+        newSchemaBuilder.addField(field);
+      }
+    }
+    return newSchemaBuilder.build();
+  }
+
   /**
    * Helps to create the outgoing container vectors based on known left and right batch schemas
    * @throws SchemaChangeException
@@ -711,8 +736,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     // Clear up the container
     container.clear();
-    leftSchema = left.getSchema();
-    rightSchema = right.getSchema();
+    leftSchema = batchSchemaWithNoExcludedCols(left.getSchema());
+    rightSchema = batchSchemaWithNoExcludedCols(right.getSchema());
 
     if (!verifyInputSchema(leftSchema)) {
       throw new SchemaChangeException("Invalid Schema found for left incoming batch");
@@ -724,12 +749,20 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     // Setup LeftSchema in outgoing container
     for (final VectorWrapper<?> vectorWrapper : left) {
-      container.addOrGet(vectorWrapper.getField());
+      final MaterializedField leftField = vectorWrapper.getField();
+      if (excludedFieldNames.contains(leftField.getName())) {
+        continue;
+      }
+      container.addOrGet(leftField);
     }
 
     // Setup RightSchema in the outgoing container
     for (final VectorWrapper<?> vectorWrapper : right) {
       MaterializedField rightField = vectorWrapper.getField();
+      if (excludedFieldNames.contains(rightField.getName())) {
+        continue;
+      }
+
       TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();
 
       // make right input schema optional if we have LEFT join
@@ -846,15 +879,28 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     // Get the vectors using field index rather than Materialized field since input batch field can be different from
     // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
     // container.
+    int inputIndex = 0;
     for (int i = startVectorIndex; i < endVectorIndex; ++i) {
-      // Get input vector
-      final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass();
-      final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector();
-
       // Get output vector
       final int outputVectorIndex = i + baseVectorIndex;
       final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
       final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
+      final String outputFieldName = outputVector.getField().getName();
+
+      ValueVector inputVector;
+      Class<?> inputValueClass;
+      String inputFieldName;
+      do {
+        // Get input vector
+        inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
+        inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
+        inputFieldName = inputVector.getField().getName();
+        ++inputIndex;
+      } while (excludedFieldNames.contains(inputFieldName));
+
+      Preconditions.checkArgument(outputFieldName.equals(inputFieldName),
+        new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" +
+          ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema())));
 
       logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
           "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
@@ -938,4 +984,13 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       maxOutputRowCount = newOutputRowCount;
     }
   }
+
+  private void populateExcludedField(PhysicalOperator lateralPop) {
+    final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
+    if (excludedCols != null) {
+      for (SchemaPath currentPath : excludedCols) {
+        excludedFieldNames.add(currentPath.rootName());
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 62967a9..ea34ed9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -62,6 +62,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 
 import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
@@ -108,7 +109,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
   private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
 
     MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
-      super(outputBatchSize, leftBatch, rightBatch);
+      super(outputBatchSize, leftBatch, rightBatch, new HashSet<>());
     }
 
     /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 2ebe887..4344e13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -17,29 +17,44 @@
  */
 package org.apache.drill.exec.record;
 
+import java.util.Set;
+
 public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
 
   private int rowWidth[];
   private RecordBatch recordBatch[];
+  private Set<String> columnsToExclude;
 
   private static final int numInputs = 2;
   public static final int LEFT_INDEX = 0;
   public static final int RIGHT_INDEX = 1;
 
-  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
+  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch,
+                                RecordBatch rightBatch, Set<String> excludedColumns) {
     super(numInputs, outputBatchSize);
     recordBatch = new RecordBatch[numInputs];
     recordBatch[LEFT_INDEX] = leftBatch;
     recordBatch[RIGHT_INDEX] = rightBatch;
     rowWidth = new int[numInputs];
+    this.columnsToExclude = excludedColumns;
   }
 
   private int updateInternal(int inputIndex, int outputPosition,  boolean useAggregate) {
     updateIncomingStats(inputIndex);
     rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
 
-    final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
+    // Reduce the width of excluded columns from actual rowWidth
+    for (String columnName : columnsToExclude) {
+      final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName);
+      if (currentColSizer == null) {
+        continue;
+      }
+      rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry();
+    }
+
+    // Get final net outgoing row width after reducing the excluded columns width
+    int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
 
     // If outgoing row width is 0 or there is no change in outgoing row width, just return.
     // This is possible for empty batches or
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 2723e30..ffac4b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -21,6 +21,7 @@ import avro.shaded.com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -29,11 +30,13 @@ import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -2870,4 +2873,159 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       leftRowSet2.clear();
     }
   }
+
+  private void testExcludedColumns(List<SchemaPath> excludedCols, CloseableRecordBatch left,
+                                   CloseableRecordBatch right, RowSet expectedRowSet) throws Exception {
+    LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(lateralPop, fixture.getFragmentContext(), left, right);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      ljBatch.close();
+      left.close();
+      right.close();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testFillingUpOutputBatch_WithExcludedColumns() throws Exception {
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(2, 20, "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 41, "item41")
+      .addRow(5, 51, "item51")
+      .build();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, "item1", 1, 11, "item11")
+      .addRow(1, "item1", 2, 21, "item21")
+      .addRow(1, "item1", 3, 31, "item31")
+      .addRow(2, "item20", 4, 41, "item41")
+      .addRow(2, "item20", 5, 51, "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+
+    try {
+      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+    } finally {
+      // Close all the resources for this test case
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
+
+  @Test
+  public void testFillingUpOutputBatch_With2ExcludedColumns() throws Exception {
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(2, 20, "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 41, "item41")
+      .addRow(5, 51, "item51")
+      .build();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      //.add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      /*.addRow("item1", 1, 11, "item11")
+      .addRow("item1", 2, 21, "item21")
+      .addRow("item1", 3, 31, "item31")
+      .addRow("item20", 4, 41, "item41")
+      .addRow("item20", 5, 51, "item51") */
+      .addRow("item1", 11, "item11")
+      .addRow("item1", 21, "item21")
+      .addRow("item1", 31, "item31")
+      .addRow("item20", 41, "item41")
+      .addRow("item20", 51, "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+    excludedCols.add(SchemaPath.getSimplePath("id_left"));
+    excludedCols.add(SchemaPath.getSimplePath("id_right"));
+
+    try {
+      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+    } finally {
+      // Close all the resources for this test case
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 3a7f899..c2e64f4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -37,8 +37,8 @@ import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.planner.logical.DrillLogicalTestutils;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarCharVector;
@@ -106,7 +106,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -140,7 +140,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -161,7 +161,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -192,7 +192,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -240,7 +240,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -289,28 +289,15 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
 
   }
 
-  @Test
-  public void testUnnestLimitBatchSize() {
-
-    final int limitedOutputBatchSize = 127;
-    final int inputBatchSize = limitedOutputBatchSize + 1;
-    // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
-    //  Lateral output batch size =  N * input row size + N * size of single unnest column
-    //                            =  N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
-    //                              + N * 4
-    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
-    //                            = N * (16 + 4N) + N * 4
-    //                            = 4N * (N + 5)
-    // configure the output batch size to be one more record than that so that the batch sizer can round down
-    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
-
+  private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize,
+                                     int limitOutputBatchSizeBytes, boolean excludeUnnestColumn) {
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -323,39 +310,76 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         }
       }
     }
+
     Integer[][][] baseline = new Integer[2][2][];
-    baseline[0][0] = new Integer[limitedOutputBatchSize];
-    baseline[0][1] = new Integer[limitedOutputBatchSize];
+    baseline[0][0] = new Integer[limitOutputBatchSize];
+    baseline[0][1] = new Integer[limitOutputBatchSize];
     baseline[1][0] = new Integer[1];
     baseline[1][1] = new Integer[1];
-    for (int i = 0; i < limitedOutputBatchSize; i++) {
+    for (int i = 0; i < limitOutputBatchSize; i++) {
       baseline[0][0][i] = 1;
       baseline[0][1][i] = i;
     }
     baseline[1][0][0] = 1; // row Num
-    baseline[1][1][0] = limitedOutputBatchSize; // value
+    baseline[1][1][0] = limitOutputBatchSize; // value
 
     // Create input schema
     TupleMetadata incomingSchema = new SchemaBuilder()
-        .add("rowNumber", TypeProtos.MinorType.INT)
-        .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+      .add("rowNumber", TypeProtos.MinorType.INT)
+      .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
 
     TupleMetadata[] incomingSchemas = {incomingSchema};
 
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
 
     final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants
-        .OUTPUT_BATCH_SIZE_VALIDATOR);
-    fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
+      .OUTPUT_BATCH_SIZE_VALIDATOR);
+    fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, excludeUnnestColumn);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
       fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
     }
+  }
 
+  @Test
+  public void testUnnestLimitBatchSize_WithExcludedCols() {
+    LateralJoinPOP previoudPop = ljPopConfig;
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("unnestColumn"));
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // Since we want 127 row count and because of nearest power of 2 adjustment output row count will be reduced to
+    // 64. So we should configure batch size for (N+1) rows if we want to output N rows where N is not power of 2
+    // size of lateral output batch = (N+1)*8 bytes, where N = output batch row count
+    //  Lateral output batch size = (N+1) * (input row size without unnest field) + (N+1) * size of single unnest column
+    //                            = (N+1) * (size of row id) + (N+1) * (size of single array entry)
+    //                            = (N+1)*4 + (N+1) * 4
+    //                            = (N+1) * 8
+    // configure the output batch size to be one more record than that so that the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 8 * (limitedOutputBatchSize + 1);
+    testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, true);
+    ljPopConfig = previoudPop;
+  }
+
+  @Test
+  public void testUnnestLimitBatchSize() {
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
+    //  Lateral output batch size =  N * input row size + N * size of single unnest column
+    //                            =  N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
+    //                              + N * 4
+    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
+    //                            = N * (16 + 4N) + N * 4
+    //                            = 4N * (N + 5)
+    // configure the output batch size to be one more record than that so that the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
+    testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, false);
   }
 
   @Test
@@ -405,7 +429,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
@@ -463,7 +487,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
@@ -496,7 +520,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (UserException|UnsupportedOperationException e) {
       return; // succeeded
     } catch (Exception e) {
@@ -511,8 +535,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
       TupleMetadata[] incomingSchemas,
       RecordBatch.IterOutcome[] iterOutcomes,
       T[][] data,
-      T[][][] baseline ) throws Exception{
-    testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline);
+      T[][][] baseline,
+      boolean excludeUnnestColumn) throws Exception{
+    testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline, excludeUnnestColumn);
   }
 
   // test unnest for various input conditions optionally invoking kill. if the kill or killBatch
@@ -522,7 +547,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
       int unnestLimit, // kill unnest after every 'unnestLimit' number of values in every record
       int execKill, // number of batches after which to kill the execution (!)
       T[][] data,
-      T[][][] baseline) throws Exception {
+      T[][][] baseline,
+      boolean excludeUnnestColumn) throws Exception {
 
     // Get the incoming container with dummy data for LJ
     final List<VectorContainer> incomingContainer = new ArrayList<>(data.length);
@@ -606,7 +632,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
       //int valueIndex = 0;
       for ( List<ValueVector> batch: results) {
         int vectorCount= batch.size();
-        if (vectorCount!= baseline[batchIndex].length+1) { // baseline does not include the original unnest column
+        int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1;
+        expectedVectorCount += baseline[batchIndex].length;
+        if (vectorCount!= expectedVectorCount) { // baseline does not include the original unnest column
           fail("Test failed in validating unnest output. Batch column count mismatch.");
         }
         for (ValueVector vv : batch) {


[drill] 01/03: DRILL-6535: ClassCastException in Lateral Unnest queries when dealing with schema changed json data Note: The issue was happening because for a left incoming all right batches were filtered and hence outputIndex was still 0 when new left incoming came with OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating output container schema.

Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0bdebf27944396d69fa4926d1bf2da5899e03033
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Jun 21 22:57:00 2018 -0700

    DRILL-6535: ClassCastException in Lateral Unnest queries when dealing with schema changed json data Note: The issue was happening because for a left incoming all right batches were filtered and hence outputIndex was still 0 when new left incoming came with OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating output container schema.
    
    This closes #1339
---
 .../exec/physical/impl/join/LateralJoinBatch.java  | 37 ++++++++++--
 .../impl/join/TestLateralJoinCorrectness.java      | 67 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 578cbc8..84dc5c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -433,6 +433,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       rightUpstream = next(RIGHT_INDEX, right);
       switch (rightUpstream) {
         case OK_NEW_SCHEMA:
+
+          // If there is some records in the output batch that means left batch didn't came with OK_NEW_SCHEMA,
+          // otherwise it would have been marked for processInFuture and output will be returned. This means for
+          // current non processed left or new left non-empty batch there is unexpected right batch schema change
+          if (outputIndex > 0) {
+            throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of " +
+              "current left batch or a new non-empty left batch with no schema change");
+          }
           // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
           // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
           //
@@ -548,6 +556,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
             // Get both left batch and the right batch and make sure indexes are properly set
             leftUpstream = processLeftBatch();
 
+            // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome
             if (processLeftBatchInFuture) {
               logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
                 "batch and process the new batch in subsequent next call", leftUpstream);
@@ -564,7 +573,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
             // If we have received the left batch with EMIT outcome and is empty then we should return previous output
             // batch with EMIT outcome
-            if (leftUpstream == EMIT && left.getRecordCount() == 0) {
+            if ((leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) && left.getRecordCount() == 0) {
               isLeftProcessed = true;
               break;
             }
@@ -579,10 +588,16 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
         // left in outgoing batch so let's get next right batch.
         // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
         // batch. Now we have got new left batch with OK outcome. Let's get next right batch
-        //
-        // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
+        // 3) OR previous left & right batch was fully processed and left came with OK outcome. Outgoing batch is
+        // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with
+        // OK_NEW_SCHEMA.
         rightUpstream = processRightBatch();
-        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch");
+        if (rightUpstream == OK_NEW_SCHEMA) {
+          leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
+          rightUpstream = OK;
+          finalizeOutputContainer();
+          return OK_NEW_SCHEMA;
+        }
 
         if (isTerminalOutcome(rightUpstream)) {
           finalizeOutputContainer();
@@ -591,6 +606,17 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
         // Update the batch memory manager to use new right incoming batch
         updateMemoryManager(RIGHT_INDEX);
+
+        // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
+        // output container based on new left schema and old right schema. If schema change failed then return STOP
+        // downstream
+        if (leftUpstream == OK_NEW_SCHEMA && isLeftProcessed) {
+          if (!handleSchemaChange()) {
+            return STOP;
+          }
+          // Since schema has change so we have new empty vectors in output container hence allocateMemory for them
+          allocateVectors();
+        }
       }
     } // output batch is full to its max capacity
 
@@ -735,6 +761,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
     }
+
+    logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", container.getAllocator().getAllocatedMemory(),
+      container.getAllocator().getPeakMemoryAllocation());
   }
 
   private boolean setBatchState(IterOutcome outcome) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index caa8137..2723e30 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -2803,4 +2803,71 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       leftOutcomes2.clear();
     }
   }
+
+  /**
+   * Test to verify that for first left incoming if there is no right side incoming batch and then second left
+   * incoming comes with schema change, then the schema change with empty output batch for first incoming is handled
+   * properly.
+   * @throws Exception
+   */
+  @Test
+  public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Exception {
+    // Create left input schema 2
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(2, "20", "item20")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    // first OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container()); // non-empty OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      // This means 2 output record batches were received because of Schema change
+      assertEquals(3, ljBatch.getRecordCount());
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      leftRowSet2.clear();
+    }
+  }
 }