You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/13 03:45:04 UTC

[drill] 12/13: DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries with schema changed partitioned complex data

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 4168e1e84d57b15d7667f7a768a0a47a577d0e79
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Mon Jul 9 17:58:08 2018 -0700

    DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries with schema changed partitioned complex data
    
    closes #1374
---
 .../drill/exec/physical/impl/unnest/Unnest.java    |  2 +
 .../exec/physical/impl/unnest/UnnestImpl.java      | 20 +++++++++
 .../physical/impl/unnest/UnnestRecordBatch.java    | 50 ++++++++++++++-------
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  | 51 ++++++++++++++++++++++
 4 files changed, 107 insertions(+), 16 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
index 77a2ffa..1a042b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
@@ -64,4 +64,6 @@ public interface Unnest {
    * time a new batch comes in.
    */
   void resetGroupIndex();
+
+  void close();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index ffc64f9..1d3b8f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.slf4j.Logger;
@@ -51,6 +52,7 @@ public class UnnestImpl implements Unnest {
   private SelectionVectorMode svMode;
   private RepeatedValueVector fieldToUnnest;
   private RepeatedValueVector.RepeatedAccessor accessor;
+  private RecordBatch outgoing;
 
   /**
    * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
@@ -97,8 +99,16 @@ public class UnnestImpl implements Unnest {
 
     logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", innerValueCount,
         recordCount, outputLimit);
+    final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
       t.splitAndTransfer(innerValueIndex, count);
+
+      // Get the corresponding ValueVector in output container and transfer the data
+      final ValueVector vectorWithData = t.getTo();
+      final ValueVector outputVector = outgoing.getContainer().addOrGet(vectorWithData.getField(), callBack);
+      Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have " +
+        "expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass());
+      vectorWithData.makeTransferPair(outputVector).transfer();
     }
     innerValueIndex += count;
     return count;
@@ -110,6 +120,7 @@ public class UnnestImpl implements Unnest {
       List<TransferPair> transfers, LateralContract lateral) throws SchemaChangeException {
 
     this.svMode = incoming.getSchema().getSelectionVectorMode();
+    this.outgoing = outgoing;
     if (svMode == NONE) {
       this.transfers = ImmutableList.copyOf(transfers);
       this.lateral = lateral;
@@ -123,4 +134,13 @@ public class UnnestImpl implements Unnest {
     this.innerValueIndex = 0;
   }
 
+  @Override
+  public void close() {
+    if (transfers != null) {
+      for (TransferPair tp : transfers) {
+        tp.getTo().close();
+      }
+      transfers = null;
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 9c1e702..d985423 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -48,7 +49,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
 public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
 
-  private Unnest unnest;
+  private Unnest unnest = new UnnestImpl();
   private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
                                         // sent. The next iteration, we need to make sure the record batch sizer
                                         // is updated before we process the actual data.
@@ -234,8 +235,23 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
             return IterOutcome.STOP;
           }
           return OK_NEW_SCHEMA;
-        }
-        // else
+        } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
+          try {
+            // This means even though there is no schema change for unnest field the reference of unnest field
+            // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
+            // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
+            // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
+            // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
+            // pair. It should do for each new left incoming batch.
+            resetUnnestTransferPair();
+            container.zeroVectors();
+          } catch (SchemaChangeException ex) {
+            kill(false);
+            logger.error("Failure during query", ex);
+            context.getExecutorState().fail(ex);
+            return IterOutcome.STOP;
+          }
+        } // else
         unnest.resetGroupIndex();
         memoryManager.update();
       }
@@ -353,26 +369,27 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     return tp;
   }
 
-  @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
-    Preconditions.checkNotNull(lateral);
-    container.clear();
-    recordCount = 0;
+  private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
     final List<TransferPair> transfers = Lists.newArrayList();
-
     final FieldReference fieldReference = new FieldReference(popConfig.getColumn());
-
     final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
-
-    final ValueVector unnestVector = transferPair.getTo();
     transfers.add(transferPair);
-    container.add(unnestVector);
     logger.debug("Added transfer for unnest expression.");
-    container.buildSchema(SelectionVectorMode.NONE);
-
-    this.unnest = new UnnestImpl();
+    unnest.close();
     unnest.setup(context, incoming, this, transfers, lateral);
     setUnnestVector();
+    return transferPair;
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    Preconditions.checkNotNull(lateral);
+    container.clear();
+    recordCount = 0;
+    unnest = new UnnestImpl();
+    final TransferPair tp = resetUnnestTransferPair();
+    container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
+    container.buildSchema(SelectionVectorMode.NONE);
     return true;
   }
 
@@ -428,6 +445,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   @Override
   public void close() {
     updateStats();
+    unnest.close();
     super.close();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 17a9d33..394e732 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -370,6 +370,37 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     }
   }
 
+  /**
+   * This test is different than {@link TestE2EUnnestAndLateral#testSchemaChangeOnNonUnnestColumn()} because with
+   * multilevel when the first Lateral see's a schema change it creates a new batch with new vector references. Hence
+   * the second lateral will receive a new incoming with new vector references with OK_NEW_SCHEMA outcome. Now even
+   * though there is schema change for non-unnest column the second Unnest has to again setup it's transfer pairs since
+   * vector reference for unnest field has changed for second Unnest.
+   * Whereas in other test since there is only 1 Lateral followed by Scan, the incoming for lateral which has
+   * schema change will be handled by Scan in such a way that it only updates vector of affected column. Hence in this
+   * case vector corresponding to unnest field will not be affected and it will work fine.
+   * @throws Exception
+   */
+  @Test
+  public void testSchemaChangeOnNonUnnestColumn_InMultilevelCase() throws Exception {
+
+    try {
+      dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+      String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+        "orders.totalprice, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+        "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+        "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems " +
+        "FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+        "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+        "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+      test(sql);
+    } catch (Exception ex) {
+      fail();
+    } finally {
+      dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+    }
+  }
+
   @Test
   public void testSchemaChangeOnUnnestColumn() throws Exception {
     try {
@@ -387,6 +418,26 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
+  public void testSchemaChangeOnUnnestColumn_InMultilevelCase() throws Exception {
+    try {
+      dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+
+      String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+        "orders.totalprice, orders.spriority, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+        "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+        "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems," +
+        " t1.o.o_shippriority as spriority FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+        "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+        "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+      test(sql);
+    } catch (Exception ex) {
+      fail();
+    } finally {
+      dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+    }
+  }
+
+  @Test
   public void testSchemaChangeOnMultipleColumns() throws Exception {
     try {
       dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_3));