You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/13 03:45:14 UTC

[GitHub] sohami closed pull request #1374: DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries…

sohami closed pull request #1374: DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries…
URL: https://github.com/apache/drill/pull/1374
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
index 77a2ffa4d92..1a042b4aad6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
@@ -64,4 +64,6 @@ void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
    * time a new batch comes in.
    */
   void resetGroupIndex();
+
+  void close();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index ffc64f92373..1d3b8f236cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.slf4j.Logger;
@@ -51,6 +52,7 @@
   private SelectionVectorMode svMode;
   private RepeatedValueVector fieldToUnnest;
   private RepeatedValueVector.RepeatedAccessor accessor;
+  private RecordBatch outgoing;
 
   /**
    * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
@@ -97,8 +99,16 @@ public final int unnestRecords(final int recordCount) {
 
     logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", innerValueCount,
         recordCount, outputLimit);
+    final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
       t.splitAndTransfer(innerValueIndex, count);
+
+      // Get the corresponding ValueVector in output container and transfer the data
+      final ValueVector vectorWithData = t.getTo();
+      final ValueVector outputVector = outgoing.getContainer().addOrGet(vectorWithData.getField(), callBack);
+      Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have " +
+        "expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass());
+      vectorWithData.makeTransferPair(outputVector).transfer();
     }
     innerValueIndex += count;
     return count;
@@ -110,6 +120,7 @@ public final void setup(FragmentContext context, RecordBatch incoming, RecordBat
       List<TransferPair> transfers, LateralContract lateral) throws SchemaChangeException {
 
     this.svMode = incoming.getSchema().getSelectionVectorMode();
+    this.outgoing = outgoing;
     if (svMode == NONE) {
       this.transfers = ImmutableList.copyOf(transfers);
       this.lateral = lateral;
@@ -123,4 +134,13 @@ public void resetGroupIndex() {
     this.innerValueIndex = 0;
   }
 
+  @Override
+  public void close() {
+    if (transfers != null) {
+      for (TransferPair tp : transfers) {
+        tp.getTo().close();
+      }
+      transfers = null;
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 3ef547c5f46..ed772fe4fba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -24,6 +24,7 @@
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -48,7 +49,7 @@
 public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
 
-  private Unnest unnest;
+  private Unnest unnest = new UnnestImpl();
   private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
                                         // to keep processing it. Kill may be called by a limit in a subquery that
                                         // requires us to stop processing thecurrent row, but not stop processing
@@ -226,8 +227,23 @@ public IterOutcome innerNext() {
             return IterOutcome.STOP;
           }
           return OK_NEW_SCHEMA;
-        }
-        // else
+        } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
+          try {
+            // This means even though there is no schema change for unnest field the reference of unnest field
+            // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
+            // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
+            // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
+            // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
+            // pair. It should do for each new left incoming batch.
+            resetUnnestTransferPair();
+            container.zeroVectors();
+          } catch (SchemaChangeException ex) {
+            kill(false);
+            logger.error("Failure during query", ex);
+            context.getExecutorState().fail(ex);
+            return IterOutcome.STOP;
+          }
+        } // else
         unnest.resetGroupIndex();
       }
       return doWork();
@@ -345,26 +361,27 @@ protected IterOutcome doWork() {
     return tp;
   }
 
-  @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
-    Preconditions.checkNotNull(lateral);
-    container.clear();
-    recordCount = 0;
+  private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
     final List<TransferPair> transfers = Lists.newArrayList();
-
     final FieldReference fieldReference = new FieldReference(popConfig.getColumn());
-
     final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
-
-    final ValueVector unnestVector = transferPair.getTo();
     transfers.add(transferPair);
-    container.add(unnestVector);
     logger.debug("Added transfer for unnest expression.");
-    container.buildSchema(SelectionVectorMode.NONE);
-
-    this.unnest = new UnnestImpl();
+    unnest.close();
     unnest.setup(context, incoming, this, transfers, lateral);
     setUnnestVector();
+    return transferPair;
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    Preconditions.checkNotNull(lateral);
+    container.clear();
+    recordCount = 0;
+    unnest = new UnnestImpl();
+    final TransferPair tp = resetUnnestTransferPair();
+    container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
+    container.buildSchema(SelectionVectorMode.NONE);
     return true;
   }
 
@@ -420,6 +437,7 @@ private void updateStats() {
   @Override
   public void close() {
     updateStats();
+    unnest.close();
     super.close();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index c57093c5605..e386e5a9722 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -370,6 +370,37 @@ public void testSchemaChangeOnNonUnnestColumn() throws Exception {
     }
   }
 
+  /**
+   * This test is different than {@link TestE2EUnnestAndLateral#testSchemaChangeOnNonUnnestColumn()} because with
+   * multilevel when the first Lateral see's a schema change it creates a new batch with new vector references. Hence
+   * the second lateral will receive a new incoming with new vector references with OK_NEW_SCHEMA outcome. Now even
+   * though there is schema change for non-unnest column the second Unnest has to again setup it's transfer pairs since
+   * vector reference for unnest field has changed for second Unnest.
+   * Whereas in other test since there is only 1 Lateral followed by Scan, the incoming for lateral which has
+   * schema change will be handled by Scan in such a way that it only updates vector of affected column. Hence in this
+   * case vector corresponding to unnest field will not be affected and it will work fine.
+   * @throws Exception
+   */
+  @Test
+  public void testSchemaChangeOnNonUnnestColumn_InMultilevelCase() throws Exception {
+
+    try {
+      dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+      String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+        "orders.totalprice, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+        "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+        "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems " +
+        "FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+        "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+        "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+      test(sql);
+    } catch (Exception ex) {
+      fail();
+    } finally {
+      dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+    }
+  }
+
   @Test
   public void testSchemaChangeOnUnnestColumn() throws Exception {
     try {
@@ -386,6 +417,26 @@ public void testSchemaChangeOnUnnestColumn() throws Exception {
     }
   }
 
+  @Test
+  public void testSchemaChangeOnUnnestColumn_InMultilevelCase() throws Exception {
+    try {
+      dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+
+      String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+        "orders.totalprice, orders.spriority, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+        "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+        "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems," +
+        " t1.o.o_shippriority as spriority FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+        "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+        "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+      test(sql);
+    } catch (Exception ex) {
+      fail();
+    } finally {
+      dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+    }
+  }
+
   @Test
   public void testSchemaChangeOnMultipleColumns() throws Exception {
     try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services