You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/08/04 19:39:18 UTC
[drill] branch master updated: DRILL-6653: Unsupported Schema
change exception where there is no schema change
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 147bc7c DRILL-6653: Unsupported Schema change exception where there is no schema change
147bc7c is described below
commit 147bc7c0fc7de31386410fab72a14e16666c910a
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Sat Aug 4 12:39:16 2018 -0700
DRILL-6653: Unsupported Schema change exception where there is no schema change
closes #1422
---
.../impl/svremover/RemovingRecordBatch.java | 8 +-
.../impl/svremover/TestSVRemoverIterOutcome.java | 283 +++++++++++++++++++++
2 files changed, 290 insertions(+), 1 deletion(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 05a1f12..acfdc87 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -51,7 +51,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
- container.clear();
+ // Don't clear off container just because an OK_NEW_SCHEMA was received from upstream. For cases when there is just
+ // change in container type but no actual schema change, RemovingRecordBatch should consume OK_NEW_SCHEMA and
+ // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container
+ // change in incoming container type is not actual schema change.
+ container.zeroVectors();
switch(incoming.getSchema().getSelectionVectorMode()){
case NONE:
this.copier = getStraightCopier();
@@ -66,6 +70,8 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
throw new UnsupportedOperationException();
}
+ // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA
+ // downstream too
if (container.isSchemaChanged()) {
container.buildSchema(SelectionVectorMode.NONE);
return true;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java
new file mode 100644
index 0000000..6613a71
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSVRemoverIterOutcome extends BaseTestOpBatchEmitOutcome {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSVRemoverIterOutcome.class);
+
+ // Holds reference to actual operator instance created for each tests
+ private static RemovingRecordBatch removingRecordBatch;
+
+ // Lits of expected outcomes populated by each tests. Used to verify actual IterOutcome returned with next call on
+ // operator to expected outcome
+ private final List<RecordBatch.IterOutcome> expectedOutcomes = new ArrayList<>();
+
+ // List of expected row counts populated by each tests. Used to verify actual output row count to expected row count
+ private final List<Integer> expectedRecordCounts = new ArrayList<>();
+
+ // List of expected row sets populated by each tests. Used to verify actual output from operator to expected output
+ private final List<RowSet> expectedRowSets = new ArrayList<>();
+
+ /**
+ * Cleanup method executed post each test
+ */
+ @After
+ public void afterTestCleanup() {
+ // close removing recordbatch
+ removingRecordBatch.close();
+
+ // Release memory from expectedRowSets
+ for (RowSet expectedRowSet : expectedRowSets) {
+ expectedRowSet.clear();
+ }
+ expectedOutcomes.clear();
+ expectedRecordCounts.clear();
+ expectedRowSets.clear();
+ }
+
+ private void testSVRemoverCommon() {
+ final SelectionVectorRemover svRemover = new SelectionVectorRemover(null);
+ final MockRecordBatch batch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainer,
+ inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+ removingRecordBatch = new RemovingRecordBatch(svRemover, fragContext, batch);
+
+ int i=0;
+ int expectedRowSetIndex = 0;
+ while (i < expectedOutcomes.size()) {
+ try {
+ assertEquals(expectedOutcomes.get(i), removingRecordBatch.next());
+ assertEquals(removingRecordBatch.getRecordCount(), (int)expectedRecordCounts.get(i++));
+
+ if (removingRecordBatch.getRecordCount() > 0) {
+ final RowSet actualRowSet = DirectRowSet.fromContainer(removingRecordBatch.getContainer());
+ new RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet);
+ }
+ } finally {
+ removingRecordBatch.getContainer().zeroVectors();
+ }
+ }
+ }
+
+ @Test
+ public void test_SimpleContainer_NoSchemaChange() {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ // Expected row sets
+ final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ expectedRowSets.add(expectedRowSet);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet.rowCount());
+
+ testSVRemoverCommon();
+ }
+
+ @Test
+ public void test_SimpleContainer_SchemaChange() {
+ inputContainer.add(emptyInputRowSet.container());
+
+ TupleMetadata inputSchema2 = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.VARCHAR)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet nonEmpty2 = operatorFixture.rowSetBuilder(inputSchema2)
+ .addRow(1, "10", "item1")
+ .addRow(2, "20", "item2")
+ .build();
+
+ inputContainer.add(nonEmpty2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ // Expected row sets
+ final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema2)
+ .addRow(1, "10", "item1")
+ .addRow(2, "20", "item2")
+ .build();
+
+ expectedRowSets.add(expectedRowSet);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet.rowCount());
+
+ testSVRemoverCommon();
+ }
+
+ @Test
+ public void test_SV2Container_NoSchemaChange() {
+ final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(false, 1, 100, "item100")
+ .addSelection(true, 1, 101, "item101")
+ .addSelection(false, 1, 102, "item102")
+ .addSelection(true, 1, 103, "item103")
+ .addSelection(false, 2, 200, "item200")
+ .addSelection(true, 2, 201, "item201")
+ .addSelection(true, 2, 202, "item202")
+ .withSv2()
+ .build();
+
+ inputContainer.add(emptyWithSv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ // Expected row sets
+ final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 101, "item101")
+ .addRow(1, 103, "item103")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ expectedRowSets.add(expectedRowSet);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet.rowCount());
+
+ testSVRemoverCommon();
+ }
+
+ @Test
+ public void test_SV2Container_SchemaChange() {
+ // Batch for schema 1
+ final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(false, 1, 100, "item100")
+ .addSelection(true, 1, 101, "item101")
+ .addSelection(false, 1, 102, "item102")
+ .addSelection(true, 1, 103, "item103")
+ .addSelection(false, 2, 200, "item200")
+ .addSelection(true, 2, 201, "item201")
+ .addSelection(true, 2, 202, "item202")
+ .withSv2()
+ .build();
+
+ // Batch for schema 2
+ TupleMetadata inputSchema2 = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.VARCHAR)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet empty2WithSv2 = operatorFixture.rowSetBuilder(inputSchema2)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmpty2InputRowSet2 = operatorFixture.rowSetBuilder(inputSchema2)
+ .addSelection(true, 1, "101", "item101")
+ .addSelection(false, 1, "102", "item102")
+ .addSelection(true, 1, "103", "item103")
+ .addSelection(false, 2, "200", "item200")
+ .addSelection(true, 2, "201", "item201")
+ .withSv2()
+ .build();
+
+ inputContainer.add(emptyWithSv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(empty2WithSv2.container());
+ inputContainer.add(nonEmpty2InputRowSet2.container());
+
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+ inputContainerSv2.add(empty2WithSv2.getSv2());
+ inputContainerSv2.add(nonEmpty2InputRowSet2.getSv2());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ // Expected row sets
+ final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 101, "item101")
+ .addRow(1, 103, "item103")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema2)
+ .addRow(1, "101", "item101")
+ .addRow(1, "103", "item103")
+ .addRow(2, "201", "item201")
+ .build();
+
+ expectedRowSets.add(expectedRowSet);
+ expectedRowSets.add(expectedRowSet1);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet.rowCount());
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+
+ testSVRemoverCommon();
+ }
+}