You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/21 00:23:29 UTC
[drill] branch master updated: DRILL-6606: Fixed bug in HashJoin
that caused it not to return OK_NEW_SCHEMA in some cases.
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 91b5e4d DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
91b5e4d is described below
commit 91b5e4d0d84dae2c6af19f82df11bb4c493a6ce8
Author: Timothy Farkas <ti...@gmail.com>
AuthorDate: Fri Jul 20 17:23:26 2018 -0700
DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
closes #1384
---
.../exec/physical/impl/join/HashJoinBatch.java | 175 +++++++++++++++------
.../physical/impl/join/TestHashJoinAdvanced.java | 37 +++++
2 files changed, 163 insertions(+), 49 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 345d182..d4d4f92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -149,6 +149,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private RecordBatch buildBatch;
private RecordBatch probeBatch;
+ /**
+ * Flag indicating whether or not the first data holding batch needs to be fetched.
+ */
+ private boolean prefetched;
+
// For handling spilling
private SpillSet spillSet;
HashJoinPOP popConfig;
@@ -174,7 +179,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
/**
* Queue of spilled partitions to process.
*/
- private ArrayList<HJSpilledPartition> spilledPartitionsList;
+ private ArrayList<HJSpilledPartition> spilledPartitionsList = new ArrayList<>();
private HJSpilledPartition spilledInners[]; // for the outer to find the partition
public enum Metric implements MetricDef {
@@ -213,86 +218,145 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
@Override
protected void buildSchema() throws SchemaChangeException {
- if (! prefetchFirstBatchFromBothSides()) {
- return;
+ // We must first get the schemas from upstream operators before we can build
+ // our schema.
+ boolean validSchema = sniffNewSchemas();
+
+ if (validSchema) {
+ // We are able to construct a valid schema from the upstream data.
+ // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
+ state = BatchState.BUILD_SCHEMA;
+ } else {
+ verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
}
+ // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
+ // we still need to build a dummy schema. These code handles both cases for us.
+ setupOutputContainerSchema();
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
// Initialize the hash join helper context
- if (rightUpstream != IterOutcome.NONE) {
+ if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ // We only need the hash tables if we have data on the build side.
setupHashTable();
}
- setupOutputContainerSchema();
+
try {
hashJoinProbe = setupHashJoinProbe();
} catch (IOException | ClassTransformationException e) {
throw new SchemaChangeException(e);
}
-
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
}
@Override
protected boolean prefetchFirstBatchFromBothSides() {
- leftUpstream = sniffNonEmptyBatch(0, left);
- rightUpstream = sniffNonEmptyBatch(1, right);
+ if (leftUpstream != IterOutcome.NONE) {
+ // We can only get data if there is data available
+ leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
+ }
+
+ if (rightUpstream != IterOutcome.NONE) {
+ // We can only get data if there is data available
+ rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
+ }
+
+ buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
- // For build side, use aggregate i.e. average row width across batches
- batchMemoryManager.update(LEFT_INDEX, 0);
- batchMemoryManager.update(RIGHT_INDEX, 0, true);
+ if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) {
+ // For build side, use aggregate i.e. average row width across batches
+ batchMemoryManager.update(LEFT_INDEX, 0);
+ batchMemoryManager.update(RIGHT_INDEX, 0, true);
- logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+ logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
- if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
- state = BatchState.STOP;
+ // Got our first batche(s)
+ state = BatchState.FIRST;
+ return true;
+ } else {
return false;
}
+ }
- if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
- state = BatchState.OUT_OF_MEMORY;
- return false;
+ /**
+ * Sniffs all data necessary to construct a schema.
+ * @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
+ */
+ private boolean sniffNewSchemas() {
+ do {
+ // Ask for data until we get a valid result.
+ leftUpstream = next(LEFT_INDEX, left);
+ } while (leftUpstream == IterOutcome.NOT_YET);
+
+ boolean isValidLeft = false;
+
+ switch (leftUpstream) {
+ case OK_NEW_SCHEMA:
+ probeSchema = probeBatch.getSchema();
+ case NONE:
+ isValidLeft = true;
+ break;
+ case OK:
+ case EMIT:
+ throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+ default:
+ // Termination condition
}
- if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
- state = BatchState.DONE;
- return false;
+ do {
+ // Ask for data until we get a valid result.
+ rightUpstream = next(RIGHT_INDEX, right);
+ } while (rightUpstream == IterOutcome.NOT_YET);
+
+ boolean isValidRight = false;
+
+ switch (rightUpstream) {
+ case OK_NEW_SCHEMA:
+ // We need to have the schema of the build side even when the build side is empty
+ rightSchema = buildBatch.getSchema();
+ // position of the new "column" for keeping the hash values (after the real columns)
+ rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
+ case NONE:
+ isValidRight = true;
+ break;
+ case OK:
+ case EMIT:
+ throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+ default:
+ // Termination condition
}
- state = BatchState.FIRST; // Got our first batches on both sides
- return true;
+ // Left and right sides must return a valid response and both sides cannot be NONE.
+ return (isValidLeft && isValidRight) &&
+ (leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE);
}
/**
* Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
* fetches the first non-empty batch from the left or right side.
+ * @param curr The current outcome.
* @param inputIndex Index specifying whether to work with the left or right input.
* @param recordBatch The left or right record batch.
* @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
*/
- private IterOutcome sniffNonEmptyBatch(int inputIndex, RecordBatch recordBatch) {
+ private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
while (true) {
- IterOutcome outcome = next(inputIndex, recordBatch);
+ if (recordBatch.getRecordCount() != 0) {
+ return curr;
+ }
- switch (outcome) {
- case OK_NEW_SCHEMA:
- if ( inputIndex == 0 ) {
- // Indicate that a schema was seen (in case probe side is empty)
- probeSchema = probeBatch.getSchema();
- } else {
- // We need to have the schema of the build side even when the build side is empty
- rightSchema = buildBatch.getSchema();
- // position of the new "column" for keeping the hash values (after the real columns)
- rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
- // new schema can also contain records
- }
+ curr = next(inputIndex, recordBatch);
+
+ switch (curr) {
case OK:
- if (recordBatch.getRecordCount() == 0) {
- continue;
- }
- // We got a non empty batch
+ // We got a data batch
+ break;
+ case NOT_YET:
+ // We need to try again
+ break;
default:
// Other cases termination conditions
- return outcome;
+ return curr;
}
}
}
@@ -317,7 +381,25 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
@Override
public IterOutcome innerNext() {
- // In case incoming was killed before, just cleanup and return
+ if (!prefetched) {
+ // If we didn't retrieve our first data hold batch, we need to do it now.
+ prefetched = true;
+ prefetchFirstBatchFromBothSides();
+
+ // Handle emitting the correct outcome for termination conditions
+ // Use the state set by prefetchFirstBatchFromBothSides to emit the correct termination outcome.
+ switch (state) {
+ case DONE:
+ return IterOutcome.NONE;
+ case STOP:
+ return IterOutcome.STOP;
+ case OUT_OF_MEMORY:
+ return IterOutcome.OUT_OF_MEMORY;
+ default:
+ // No termination condition so continue processing.
+ }
+ }
+
if ( wasKilled ) {
this.cleanup();
super.close();
@@ -504,13 +586,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
- // Create the FIFO list of spilled partitions (pairs - inner/outer)
- spilledPartitionsList = new ArrayList<>();
-
// Create array for the partitions
partitions = new HashPartition[numPartitions];
-
- buildSideIsEmpty = false;
}
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
index 6893e8e..e093632 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
@@ -19,9 +19,16 @@ package org.apache.drill.exec.physical.impl.join;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -29,6 +36,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.nio.file.Paths;
+import java.util.List;
import java.util.regex.Pattern;
@Category(OperatorTest.class)
@@ -206,4 +214,33 @@ public class TestHashJoinAdvanced extends JoinTestBase {
final Pattern sortHashJoinPattern = Pattern.compile(".*Sort.*HashJoin", Pattern.DOTALL);
testPlanMatchingPatterns(query, new Pattern[]{sortHashJoinPattern}, null);
}
+
+ @Test // DRILL-6606
+ public void testJoinLimit0Schema() throws Exception {
+ String query = "SELECT l.l_quantity, l.l_shipdate, o.o_custkey\n" +
+ "FROM (SELECT * FROM cp.`tpch/lineitem.parquet` LIMIT 0) l\n" +
+ " JOIN (SELECT * FROM cp.`tpch/orders.parquet` LIMIT 0) o \n" +
+ " ON l.l_orderkey = o.o_orderkey\n";
+ final List<QueryDataBatch> dataBatches = client.runQuery(UserBitShared.QueryType.SQL, query);
+
+ Assert.assertEquals(1, dataBatches.size());
+
+ final QueryDataBatch queryDataBatch = dataBatches.get(0);
+ final RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+
+ try {
+ batchLoader.load(queryDataBatch.getHeader().getDef(), queryDataBatch.getData());
+
+ final BatchSchema actualSchema = batchLoader.getSchema();
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("l_quantity", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+ .add("l_shipdate", TypeProtos.MinorType.DATE, TypeProtos.DataMode.REQUIRED)
+ .add("o_custkey", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+ .build();
+
+ Assert.assertTrue(expectedSchema.isEquivalent(actualSchema));
+ } finally {
+ batchLoader.clear();
+ }
+ }
}