You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/02 22:52:09 UTC
[drill] 01/03: DRILL-6535: ClassCastException in Lateral Unnest
queries when dealing with schema changed json data Note: The issue was
happening because for a left incoming all right batches were filtered and
hence outputIndex was still 0 when new left incoming came with
OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating
output container schema.
This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 0bdebf27944396d69fa4926d1bf2da5899e03033
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Jun 21 22:57:00 2018 -0700
DRILL-6535: ClassCastException in Lateral Unnest queries when dealing with schema changed json data Note: The issue was happening because for a left incoming all right batches were filtered and hence outputIndex was still 0 when new left incoming came with OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating output container schema.
This closes #1339
---
.../exec/physical/impl/join/LateralJoinBatch.java | 37 ++++++++++--
.../impl/join/TestLateralJoinCorrectness.java | 67 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 4 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 578cbc8..84dc5c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -433,6 +433,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
rightUpstream = next(RIGHT_INDEX, right);
switch (rightUpstream) {
case OK_NEW_SCHEMA:
+
+ // If there is some records in the output batch that means left batch didn't came with OK_NEW_SCHEMA,
+ // otherwise it would have been marked for processInFuture and output will be returned. This means for
+ // current non processed left or new left non-empty batch there is unexpected right batch schema change
+ if (outputIndex > 0) {
+ throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of " +
+ "current left batch or a new non-empty left batch with no schema change");
+ }
// We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
// case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
//
@@ -548,6 +556,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Get both left batch and the right batch and make sure indexes are properly set
leftUpstream = processLeftBatch();
+ // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome
if (processLeftBatchInFuture) {
logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
"batch and process the new batch in subsequent next call", leftUpstream);
@@ -564,7 +573,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// If we have received the left batch with EMIT outcome and is empty then we should return previous output
// batch with EMIT outcome
- if (leftUpstream == EMIT && left.getRecordCount() == 0) {
+ if ((leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) && left.getRecordCount() == 0) {
isLeftProcessed = true;
break;
}
@@ -579,10 +588,16 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// left in outgoing batch so let's get next right batch.
// 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
// batch. Now we have got new left batch with OK outcome. Let's get next right batch
- //
- // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
+ // 3) OR previous left & right batch was fully processed and left came with OK outcome. Outgoing batch is
+ // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with
+ // OK_NEW_SCHEMA.
rightUpstream = processRightBatch();
- Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch");
+ if (rightUpstream == OK_NEW_SCHEMA) {
+ leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
+ rightUpstream = OK;
+ finalizeOutputContainer();
+ return OK_NEW_SCHEMA;
+ }
if (isTerminalOutcome(rightUpstream)) {
finalizeOutputContainer();
@@ -591,6 +606,17 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Update the batch memory manager to use new right incoming batch
updateMemoryManager(RIGHT_INDEX);
+
+ // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
+ // output container based on new left schema and old right schema. If schema change failed then return STOP
+ // downstream
+ if (leftUpstream == OK_NEW_SCHEMA && isLeftProcessed) {
+ if (!handleSchemaChange()) {
+ return STOP;
+ }
+ // Since schema has change so we have new empty vectors in output container hence allocateMemory for them
+ allocateVectors();
+ }
}
} // output batch is full to its max capacity
@@ -735,6 +761,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
}
+
+ logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", container.getAllocator().getAllocatedMemory(),
+ container.getAllocator().getPeakMemoryAllocation());
}
private boolean setBatchState(IterOutcome outcome) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index caa8137..2723e30 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -2803,4 +2803,71 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
leftOutcomes2.clear();
}
}
+
+ /**
+ * Test to verify that for first left incoming if there is no right side incoming batch and then second left
+ * incoming comes with schema change, then the schema change with empty output batch for first incoming is handled
+ * properly.
+ * @throws Exception
+ */
+ @Test
+ public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Exception {
+ // Create left input schema 2
+ TupleMetadata leftSchema2 = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.VARCHAR)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+
+ // Create data for left input
+ final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+ .addRow(2, "20", "item20")
+ .build();
+
+ // Get the left container with dummy data for Lateral Join
+ leftContainer.add(nonEmptyLeftRowSet.container());
+ leftContainer.add(leftRowSet2.container());
+
+ // Get the left IterOutcomes for Lateral Join
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ // Create Left MockRecordBatch
+ final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+ // Get the right container with dummy data
+ // first OK_NEW_SCHEMA batch
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet.container()); // non-empty OK_NEW_SCHEMA batch
+ rightContainer.add(emptyRightRowSet.container());
+
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+ final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+ leftMockBatch, rightMockBatch);
+
+ try {
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ // This means 2 output record batches were received because of Schema change
+ assertEquals(3, ljBatch.getRecordCount());
+ assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+ } catch (AssertionError | Exception error) {
+ fail();
+ } finally {
+ // Close all the resources for this test case
+ ljBatch.close();
+ leftMockBatch.close();
+ rightMockBatch.close();
+ leftRowSet2.clear();
+ }
+ }
}