You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/02 22:52:24 UTC

[GitHub] parthchandra closed pull request #1339: DRILL-6535: ClassCastException in Lateral Unnest queries when dealing…

parthchandra closed pull request #1339: DRILL-6535: ClassCastException in Lateral Unnest queries when dealing…
URL: https://github.com/apache/drill/pull/1339
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 578cbc8742d..84dc5c344fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -433,6 +433,14 @@ private IterOutcome processRightBatch() {
       rightUpstream = next(RIGHT_INDEX, right);
       switch (rightUpstream) {
         case OK_NEW_SCHEMA:
+
+          // If there is some records in the output batch that means left batch didn't came with OK_NEW_SCHEMA,
+          // otherwise it would have been marked for processInFuture and output will be returned. This means for
+          // current non processed left or new left non-empty batch there is unexpected right batch schema change
+          if (outputIndex > 0) {
+            throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of " +
+              "current left batch or a new non-empty left batch with no schema change");
+          }
           // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
           // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
           //
@@ -548,6 +556,7 @@ private IterOutcome produceOutputBatch() {
             // Get both left batch and the right batch and make sure indexes are properly set
             leftUpstream = processLeftBatch();
 
+            // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome
             if (processLeftBatchInFuture) {
               logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
                 "batch and process the new batch in subsequent next call", leftUpstream);
@@ -564,7 +573,7 @@ private IterOutcome produceOutputBatch() {
 
             // If we have received the left batch with EMIT outcome and is empty then we should return previous output
             // batch with EMIT outcome
-            if (leftUpstream == EMIT && left.getRecordCount() == 0) {
+            if ((leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) && left.getRecordCount() == 0) {
               isLeftProcessed = true;
               break;
             }
@@ -579,10 +588,16 @@ private IterOutcome produceOutputBatch() {
         // left in outgoing batch so let's get next right batch.
         // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
         // batch. Now we have got new left batch with OK outcome. Let's get next right batch
-        //
-        // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
+        // 3) OR previous left & right batch was fully processed and left came with OK outcome. Outgoing batch is
+        // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with
+        // OK_NEW_SCHEMA.
         rightUpstream = processRightBatch();
-        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch");
+        if (rightUpstream == OK_NEW_SCHEMA) {
+          leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
+          rightUpstream = OK;
+          finalizeOutputContainer();
+          return OK_NEW_SCHEMA;
+        }
 
         if (isTerminalOutcome(rightUpstream)) {
           finalizeOutputContainer();
@@ -591,6 +606,17 @@ private IterOutcome produceOutputBatch() {
 
         // Update the batch memory manager to use new right incoming batch
         updateMemoryManager(RIGHT_INDEX);
+
+        // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
+        // output container based on new left schema and old right schema. If schema change failed then return STOP
+        // downstream
+        if (leftUpstream == OK_NEW_SCHEMA && isLeftProcessed) {
+          if (!handleSchemaChange()) {
+            return STOP;
+          }
+          // Since schema has change so we have new empty vectors in output container hence allocateMemory for them
+          allocateVectors();
+        }
       }
     } // output batch is full to its max capacity
 
@@ -735,6 +761,9 @@ private void allocateVectors() {
       RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
     }
+
+    logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", container.getAllocator().getAllocatedMemory(),
+      container.getAllocator().getPeakMemoryAllocation());
   }
 
   private boolean setBatchState(IterOutcome outcome) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 79a7bd438b0..2c04de39753 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -2802,4 +2802,71 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() t
       leftOutcomes2.clear();
     }
   }
+
+  /**
+   * Test to verify that for first left incoming if there is no right side incoming batch and then second left
+   * incoming comes with schema change, then the schema change with empty output batch for first incoming is handled
+   * properly.
+   * @throws Exception
+   */
+  @Test
+  public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Exception {
+    // Create left input schema 2
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(2, "20", "item20")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    // first OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container()); // non-empty OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      // This means 2 output record batches were received because of Schema change
+      assertEquals(3, ljBatch.getRecordCount());
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      leftRowSet2.clear();
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services