You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/13 03:45:04 UTC
[drill] 12/13: DRILL-6542 : IndexOutOfBoundsException for
multilevel lateral queries with schema changed partitioned complex data
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 4168e1e84d57b15d7667f7a768a0a47a577d0e79
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Mon Jul 9 17:58:08 2018 -0700
DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries with schema changed partitioned complex data
closes #1374
---
.../drill/exec/physical/impl/unnest/Unnest.java | 2 +
.../exec/physical/impl/unnest/UnnestImpl.java | 20 +++++++++
.../physical/impl/unnest/UnnestRecordBatch.java | 50 ++++++++++++++-------
.../impl/lateraljoin/TestE2EUnnestAndLateral.java | 51 ++++++++++++++++++++++
4 files changed, 107 insertions(+), 16 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
index 77a2ffa..1a042b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
@@ -64,4 +64,6 @@ public interface Unnest {
* time a new batch comes in.
*/
void resetGroupIndex();
+
+ void close();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index ffc64f9..1d3b8f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.LateralContract;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.slf4j.Logger;
@@ -51,6 +52,7 @@ public class UnnestImpl implements Unnest {
private SelectionVectorMode svMode;
private RepeatedValueVector fieldToUnnest;
private RepeatedValueVector.RepeatedAccessor accessor;
+ private RecordBatch outgoing;
/**
* The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
@@ -97,8 +99,16 @@ public class UnnestImpl implements Unnest {
logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", innerValueCount,
recordCount, outputLimit);
+ final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
for (TransferPair t : transfers) {
t.splitAndTransfer(innerValueIndex, count);
+
+ // Get the corresponding ValueVector in output container and transfer the data
+ final ValueVector vectorWithData = t.getTo();
+ final ValueVector outputVector = outgoing.getContainer().addOrGet(vectorWithData.getField(), callBack);
+ Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have " +
+ "expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass());
+ vectorWithData.makeTransferPair(outputVector).transfer();
}
innerValueIndex += count;
return count;
@@ -110,6 +120,7 @@ public class UnnestImpl implements Unnest {
List<TransferPair> transfers, LateralContract lateral) throws SchemaChangeException {
this.svMode = incoming.getSchema().getSelectionVectorMode();
+ this.outgoing = outgoing;
if (svMode == NONE) {
this.transfers = ImmutableList.copyOf(transfers);
this.lateral = lateral;
@@ -123,4 +134,13 @@ public class UnnestImpl implements Unnest {
this.innerValueIndex = 0;
}
+ @Override
+ public void close() {
+ if (transfers != null) {
+ for (TransferPair tp : transfers) {
+ tp.getTo().close();
+ }
+ transfers = null;
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 9c1e702..d985423 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -48,7 +49,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
- private Unnest unnest;
+ private Unnest unnest = new UnnestImpl();
private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
// sent. The next iteration, we need to make sure the record batch sizer
// is updated before we process the actual data.
@@ -234,8 +235,23 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
return IterOutcome.STOP;
}
return OK_NEW_SCHEMA;
- }
- // else
+ } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
+ try {
+ // This means even though there is no schema change for unnest field the reference of unnest field
+ // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
+ // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
+ // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
+ // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
+ // pair. It should do for each new left incoming batch.
+ resetUnnestTransferPair();
+ container.zeroVectors();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ }
+ } // else
unnest.resetGroupIndex();
memoryManager.update();
}
@@ -353,26 +369,27 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
return tp;
}
- @Override
- protected boolean setupNewSchema() throws SchemaChangeException {
- Preconditions.checkNotNull(lateral);
- container.clear();
- recordCount = 0;
+ private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
final List<TransferPair> transfers = Lists.newArrayList();
-
final FieldReference fieldReference = new FieldReference(popConfig.getColumn());
-
final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
-
- final ValueVector unnestVector = transferPair.getTo();
transfers.add(transferPair);
- container.add(unnestVector);
logger.debug("Added transfer for unnest expression.");
- container.buildSchema(SelectionVectorMode.NONE);
-
- this.unnest = new UnnestImpl();
+ unnest.close();
unnest.setup(context, incoming, this, transfers, lateral);
setUnnestVector();
+ return transferPair;
+ }
+
+ @Override
+ protected boolean setupNewSchema() throws SchemaChangeException {
+ Preconditions.checkNotNull(lateral);
+ container.clear();
+ recordCount = 0;
+ unnest = new UnnestImpl();
+ final TransferPair tp = resetUnnestTransferPair();
+ container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
+ container.buildSchema(SelectionVectorMode.NONE);
return true;
}
@@ -428,6 +445,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
@Override
public void close() {
updateStats();
+ unnest.close();
super.close();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 17a9d33..394e732 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -370,6 +370,37 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
}
}
+ /**
+ * This test is different than {@link TestE2EUnnestAndLateral#testSchemaChangeOnNonUnnestColumn()} because with
+ * multilevel when the first Lateral see's a schema change it creates a new batch with new vector references. Hence
+ * the second lateral will receive a new incoming with new vector references with OK_NEW_SCHEMA outcome. Now even
+ * though there is schema change for non-unnest column the second Unnest has to again setup it's transfer pairs since
+ * vector reference for unnest field has changed for second Unnest.
+ * Whereas in other test since there is only 1 Lateral followed by Scan, the incoming for lateral which has
+ * schema change will be handled by Scan in such a way that it only updates vector of affected column. Hence in this
+ * case vector corresponding to unnest field will not be affected and it will work fine.
+ * @throws Exception
+ */
+ @Test
+ public void testSchemaChangeOnNonUnnestColumn_InMultilevelCase() throws Exception {
+
+ try {
+ dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+ String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+ "orders.totalprice, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+ "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+ "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems " +
+ "FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+ "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+ "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+ test(sql);
+ } catch (Exception ex) {
+ fail();
+ } finally {
+ dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+ }
+ }
+
@Test
public void testSchemaChangeOnUnnestColumn() throws Exception {
try {
@@ -387,6 +418,26 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
}
@Test
+ public void testSchemaChangeOnUnnestColumn_InMultilevelCase() throws Exception {
+ try {
+ dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+
+ String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+ "orders.totalprice, orders.spriority, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+ "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+ "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems," +
+ " t1.o.o_shippriority as spriority FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+ "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+ "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+ test(sql);
+ } catch (Exception ex) {
+ fail();
+ } finally {
+ dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+ }
+ }
+
+ @Test
public void testSchemaChangeOnMultipleColumns() throws Exception {
try {
dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_3));