You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/02 22:52:09 UTC

[drill] 01/03: DRILL-6535: ClassCastException in Lateral Unnest queries when dealing with schema changed json data Note: The issue was happening because for a left incoming all right batches were filtered and hence outputIndex was still 0 when new left incoming came with OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating output container schema.

This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0bdebf27944396d69fa4926d1bf2da5899e03033
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Jun 21 22:57:00 2018 -0700

    DRILL-6535: ClassCastException in Lateral Unnest queries when dealing with schema changed json data Note: The issue was happening because for a left incoming all right batches were filtered and hence outputIndex was still 0 when new left incoming came with OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating output container schema.
    
    This closes #1339
---
 .../exec/physical/impl/join/LateralJoinBatch.java  | 37 ++++++++++--
 .../impl/join/TestLateralJoinCorrectness.java      | 67 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 578cbc8..84dc5c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -433,6 +433,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       rightUpstream = next(RIGHT_INDEX, right);
       switch (rightUpstream) {
         case OK_NEW_SCHEMA:
+
+          // If there is some records in the output batch that means left batch didn't came with OK_NEW_SCHEMA,
+          // otherwise it would have been marked for processInFuture and output will be returned. This means for
+          // current non processed left or new left non-empty batch there is unexpected right batch schema change
+          if (outputIndex > 0) {
+            throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of " +
+              "current left batch or a new non-empty left batch with no schema change");
+          }
           // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
           // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
           //
@@ -548,6 +556,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
             // Get both left batch and the right batch and make sure indexes are properly set
             leftUpstream = processLeftBatch();
 
+            // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome
             if (processLeftBatchInFuture) {
               logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
                 "batch and process the new batch in subsequent next call", leftUpstream);
@@ -564,7 +573,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
             // If we have received the left batch with EMIT outcome and is empty then we should return previous output
             // batch with EMIT outcome
-            if (leftUpstream == EMIT && left.getRecordCount() == 0) {
+            if ((leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) && left.getRecordCount() == 0) {
               isLeftProcessed = true;
               break;
             }
@@ -579,10 +588,16 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
         // left in outgoing batch so let's get next right batch.
         // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
         // batch. Now we have got new left batch with OK outcome. Let's get next right batch
-        //
-        // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
+        // 3) OR previous left & right batch was fully processed and left came with OK outcome. Outgoing batch is
+        // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with
+        // OK_NEW_SCHEMA.
         rightUpstream = processRightBatch();
-        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch");
+        if (rightUpstream == OK_NEW_SCHEMA) {
+          leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
+          rightUpstream = OK;
+          finalizeOutputContainer();
+          return OK_NEW_SCHEMA;
+        }
 
         if (isTerminalOutcome(rightUpstream)) {
           finalizeOutputContainer();
@@ -591,6 +606,17 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
         // Update the batch memory manager to use new right incoming batch
         updateMemoryManager(RIGHT_INDEX);
+
+        // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
+        // output container based on new left schema and old right schema. If schema change failed then return STOP
+        // downstream
+        if (leftUpstream == OK_NEW_SCHEMA && isLeftProcessed) {
+          if (!handleSchemaChange()) {
+            return STOP;
+          }
+          // Since schema has change so we have new empty vectors in output container hence allocateMemory for them
+          allocateVectors();
+        }
       }
     } // output batch is full to its max capacity
 
@@ -735,6 +761,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
     }
+
+    logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", container.getAllocator().getAllocatedMemory(),
+      container.getAllocator().getPeakMemoryAllocation());
   }
 
   private boolean setBatchState(IterOutcome outcome) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index caa8137..2723e30 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -2803,4 +2803,71 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       leftOutcomes2.clear();
     }
   }
+
+  /**
+   * Test to verify that for first left incoming if there is no right side incoming batch and then second left
+   * incoming comes with schema change, then the schema change with empty output batch for first incoming is handled
+   * properly.
+   * @throws Exception
+   */
+  @Test
+  public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Exception {
+    // Create left input schema 2
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(2, "20", "item20")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    // first OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container()); // non-empty OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      // This means 2 output record batches were received because of Schema change
+      assertEquals(3, ljBatch.getRecordCount());
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      leftRowSet2.clear();
+    }
+  }
 }