You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/08/04 19:39:18 UTC

[drill] branch master updated: DRILL-6653: Unsupported Schema change exception where there is no schema change

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 147bc7c  DRILL-6653: Unsupported Schema change exception where there is no schema change
147bc7c is described below

commit 147bc7c0fc7de31386410fab72a14e16666c910a
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Sat Aug 4 12:39:16 2018 -0700

    DRILL-6653: Unsupported Schema change exception where there is no schema change
    
    closes #1422
---
 .../impl/svremover/RemovingRecordBatch.java        |   8 +-
 .../impl/svremover/TestSVRemoverIterOutcome.java   | 283 +++++++++++++++++++++
 2 files changed, 290 insertions(+), 1 deletion(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 05a1f12..acfdc87 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -51,7 +51,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.clear();
+    // Don't clear off container just because an OK_NEW_SCHEMA was received from upstream. For cases when there is just
+    // change in container type but no actual schema change, RemovingRecordBatch should consume OK_NEW_SCHEMA and
+    // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container
+    // change in incoming container type is not actual schema change.
+    container.zeroVectors();
     switch(incoming.getSchema().getSelectionVectorMode()){
     case NONE:
       this.copier = getStraightCopier();
@@ -66,6 +70,8 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
       throw new UnsupportedOperationException();
     }
 
+    // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA
+    // downstream too
     if (container.isSchemaChanged()) {
       container.buildSchema(SelectionVectorMode.NONE);
       return true;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java
new file mode 100644
index 0000000..6613a71
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSVRemoverIterOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSVRemoverIterOutcome.class);
+
+  // Holds reference to actual operator instance created for each tests
+  private static RemovingRecordBatch removingRecordBatch;
+
+  // Lits of expected outcomes populated by each tests. Used to verify actual IterOutcome returned with next call on
+  // operator to expected outcome
+  private final List<RecordBatch.IterOutcome> expectedOutcomes = new ArrayList<>();
+
+  // List of expected row counts populated by each tests. Used to verify actual output row count to expected row count
+  private final List<Integer> expectedRecordCounts = new ArrayList<>();
+
+  // List of expected row sets populated by each tests. Used to verify actual output from operator to expected output
+  private final List<RowSet> expectedRowSets = new ArrayList<>();
+
+  /**
+   * Cleanup method executed post each test
+   */
+  @After
+  public void afterTestCleanup() {
+    // close removing recordbatch
+    removingRecordBatch.close();
+
+    // Release memory from expectedRowSets
+    for (RowSet expectedRowSet : expectedRowSets) {
+      expectedRowSet.clear();
+    }
+    expectedOutcomes.clear();
+    expectedRecordCounts.clear();
+    expectedRowSets.clear();
+  }
+
+  private void testSVRemoverCommon() {
+    final SelectionVectorRemover svRemover = new SelectionVectorRemover(null);
+    final MockRecordBatch batch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainer,
+        inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+    removingRecordBatch = new RemovingRecordBatch(svRemover, fragContext, batch);
+
+    int i=0;
+    int expectedRowSetIndex = 0;
+    while (i < expectedOutcomes.size()) {
+      try {
+        assertEquals(expectedOutcomes.get(i), removingRecordBatch.next());
+        assertEquals(removingRecordBatch.getRecordCount(), (int)expectedRecordCounts.get(i++));
+
+        if (removingRecordBatch.getRecordCount() > 0) {
+          final RowSet actualRowSet = DirectRowSet.fromContainer(removingRecordBatch.getContainer());
+          new RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet);
+        }
+      } finally {
+        removingRecordBatch.getContainer().zeroVectors();
+      }
+    }
+  }
+
+  @Test
+  public void test_SimpleContainer_NoSchemaChange() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Expected row sets
+    final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    expectedRowSets.add(expectedRowSet);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet.rowCount());
+
+    testSVRemoverCommon();
+  }
+
+  @Test
+  public void test_SimpleContainer_SchemaChange() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    TupleMetadata inputSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet nonEmpty2 = operatorFixture.rowSetBuilder(inputSchema2)
+      .addRow(1, "10", "item1")
+      .addRow(2, "20", "item2")
+      .build();
+
+    inputContainer.add(nonEmpty2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Expected row sets
+    final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema2)
+      .addRow(1, "10", "item1")
+      .addRow(2, "20", "item2")
+      .build();
+
+    expectedRowSets.add(expectedRowSet);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet.rowCount());
+
+    testSVRemoverCommon();
+  }
+
+  @Test
+  public void test_SV2Container_NoSchemaChange() {
+    final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 100, "item100")
+      .addSelection(true, 1, 101, "item101")
+      .addSelection(false, 1, 102, "item102")
+      .addSelection(true, 1, 103, "item103")
+      .addSelection(false, 2, 200, "item200")
+      .addSelection(true, 2, 201, "item201")
+      .addSelection(true, 2, 202, "item202")
+      .withSv2()
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Expected row sets
+    final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(1, 103, "item103")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    expectedRowSets.add(expectedRowSet);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet.rowCount());
+
+    testSVRemoverCommon();
+  }
+
+  @Test
+  public void test_SV2Container_SchemaChange() {
+    // Batch for schema 1
+    final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 100, "item100")
+      .addSelection(true, 1, 101, "item101")
+      .addSelection(false, 1, 102, "item102")
+      .addSelection(true, 1, 103, "item103")
+      .addSelection(false, 2, 200, "item200")
+      .addSelection(true, 2, 201, "item201")
+      .addSelection(true, 2, 202, "item202")
+      .withSv2()
+      .build();
+
+    // Batch for schema 2
+    TupleMetadata inputSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet empty2WithSv2 = operatorFixture.rowSetBuilder(inputSchema2)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmpty2InputRowSet2 = operatorFixture.rowSetBuilder(inputSchema2)
+      .addSelection(true, 1, "101", "item101")
+      .addSelection(false, 1, "102", "item102")
+      .addSelection(true, 1, "103", "item103")
+      .addSelection(false, 2, "200", "item200")
+      .addSelection(true, 2, "201", "item201")
+      .withSv2()
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(empty2WithSv2.container());
+    inputContainer.add(nonEmpty2InputRowSet2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+    inputContainerSv2.add(empty2WithSv2.getSv2());
+    inputContainerSv2.add(nonEmpty2InputRowSet2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Expected row sets
+    final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(1, 103, "item103")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema2)
+      .addRow(1, "101", "item101")
+      .addRow(1, "103", "item103")
+      .addRow(2, "201", "item201")
+      .build();
+
+    expectedRowSets.add(expectedRowSet);
+    expectedRowSets.add(expectedRowSet1);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet.rowCount());
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+
+    testSVRemoverCommon();
+  }
+}