You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2023/02/12 20:04:32 UTC
[drill] branch master updated: DRILL-4232: Support for EXCEPT and INTERSECT set operator (#2599)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new c38091fdd6 DRILL-4232: Support for EXCEPT and INTERSECT set operator (#2599)
c38091fdd6 is described below
commit c38091fdd66a222657b3c617aaab834d1594570d
Author: leon <32...@qq.com>
AuthorDate: Mon Feb 13 04:04:25 2023 +0800
DRILL-4232: Support for EXCEPT and INTERSECT set operator (#2599)
---
.../java/org/apache/drill/exec/ExecConstants.java | 4 +
.../physical/base/AbstractPhysicalVisitor.java | 6 +
.../drill/exec/physical/base/PhysicalVisitor.java | 2 +
.../apache/drill/exec/physical/config/SetOp.java | 69 +
.../exec/physical/impl/common/HashPartition.java | 12 +
.../drill/exec/physical/impl/common/HashTable.java | 20 +
.../exec/physical/impl/common/HashTableConfig.java | 18 +-
.../physical/impl/common/HashTableTemplate.java | 60 +-
...tch.java => AbstractHashBinaryRecordBatch.java} | 396 ++---
.../exec/physical/impl/join/HashJoinBatch.java | 1590 +-------------------
.../physical/impl/join/HashJoinProbeTemplate.java | 460 +-----
.../impl/join/{HashJoinProbe.java => Probe.java} | 29 +-
...shJoinProbeTemplate.java => ProbeTemplate.java} | 237 +--
.../impl/setop/HashSetOpProbeTemplate.java | 99 ++
.../physical/impl/setop/HashSetOpRecordBatch.java | 139 ++
.../physical/impl/setop/SetOpBatchCreator.java | 36 +
.../apache/drill/exec/planner/PlannerPhase.java | 6 +
.../drill/exec/planner/common/DrillSetOpRel.java | 35 +
.../exec/planner/common/DrillUnionRelBase.java | 58 -
.../drill/exec/planner/logical/DrillExceptRel.java | 98 ++
.../{DrillUnionRel.java => DrillIntersectRel.java} | 64 +-
.../exec/planner/logical/DrillRelFactories.java | 23 +
.../drill/exec/planner/logical/DrillSetOpRule.java | 64 +
.../exec/planner/logical/DrillUnionAllRule.java | 2 +-
.../drill/exec/planner/logical/DrillUnionRel.java | 19 +-
.../exec/planner/logical/PreProcessLogicalRel.java | 31 +
.../exec/planner/logical/ScanFieldDeterminer.java | 18 +
.../physical/{UnionAllPrel.java => SetOpPrel.java} | 77 +-
.../drill/exec/planner/physical/SetOpPrule.java | 242 +++
.../drill/exec/planner/physical/UnionAllPrel.java | 9 +-
.../drill/exec/planner/physical/UnionAllPrule.java | 3 +-
.../exec/planner/physical/UnionDistinctPrel.java | 9 +-
.../exec/planner/physical/UnionDistinctPrule.java | 3 +-
.../drill/exec/planner/physical/UnionPrel.java | 20 +-
.../physical/visitor/FinalColumnReorderer.java | 11 +-
.../planner/sql/handlers/FindLimit0Visitor.java | 6 +-
.../sql/parser/UnsupportedOperatorsVisitor.java | 10 +-
.../exec/planner/torel/ConversionContext.java | 14 +
.../exec/server/options/SystemOptionManager.java | 1 +
.../drill/exec/store/plan/rel/PluginUnionRel.java | 10 +-
.../java-exec/src/main/resources/drill-module.conf | 1 +
.../apache/drill/TestDisabledFunctionality.java | 36 -
.../src/test/java/org/apache/drill/TestSetOp.java | 1182 +++++++++++++++
.../java/org/apache/drill/test/QueryBuilder.java | 13 +-
.../apache/drill/common/logical/data/Except.java | 87 ++
.../drill/common/logical/data/Intersect.java | 87 ++
.../data/visitors/AbstractLogicalVisitor.java | 12 +
.../logical/data/visitors/LogicalVisitor.java | 4 +
48 files changed, 2837 insertions(+), 2595 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 696aa804a5..73aac06216 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -600,6 +600,10 @@ public final class ExecConstants {
public static final BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator(ENABLE_UNION_TYPE_KEY,
new OptionDescription("Enable support for Avro union type."));
+ public static final String EXCEPT_ADD_AGG_BELOW_KEY = "exec.except_add_agg_below";
+ public static final BooleanValidator EXCEPT_ADD_AGG_BELOW = new BooleanValidator(EXCEPT_ADD_AGG_BELOW_KEY,
+ new OptionDescription("Add agg below setop for left input when doing except, otherwise above setop"));
+
// Kafka plugin related options.
public static final String KAFKA_ALL_TEXT_MODE = "store.kafka.all_text_mode";
public static final OptionValidator KAFKA_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(KAFKA_ALL_TEXT_MODE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index ea2221abaf..e58f5ea922 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RangePartitionSender;
import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SetOp;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StatisticsAggregate;
@@ -59,6 +60,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
return visitOp(union, value);
}
+ @Override
+ public T visitSetOp(SetOp setOp, X value) throws E {
+ return visitOp(setOp, value);
+ }
+
@Override
public T visitWriter(Writer writer, X value) throws E {
return visitOp(writer, value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index d1b1479bb7..dee1203ac3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RangePartitionSender;
import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SetOp;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StatisticsAggregate;
@@ -63,6 +64,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
public RETURN visitUnion(UnionAll union, EXTRA value) throws EXCEP;
+ public RETURN visitSetOp(SetOp setOp, EXTRA value) throws EXCEP;
public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP;
public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SetOp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SetOp.java
new file mode 100644
index 0000000000..27f77df28f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SetOp.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.AbstractMultiple;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+
+import java.util.List;
+
+@JsonTypeName("set-op")
+public class SetOp extends AbstractMultiple {
+ public SqlKind kind;
+ public boolean all;
+
+ @JsonCreator
+ public SetOp(@JsonProperty("children") List<PhysicalOperator> children, @JsonProperty("sqlKind") SqlKind kind, @JsonProperty("all") boolean all) {
+ super(children);
+ this.kind = kind;
+ this.all = all;
+ }
+
+ public SqlKind getKind() {
+ return kind;
+ }
+
+ public boolean isAll() {
+ return all;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSetOp(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ return new SetOp(children, kind, all);
+ }
+
+ @Override
+ public String getOperatorType() {
+ return kind.name() + (all ? "_ALL" : "");
+ }
+
+ @Override @JsonIgnore
+ public boolean isBufferedOperator(QueryContext queryContext) { return true; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 52fe0a8d59..2b21831f34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -386,6 +386,18 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
return hashTable.probeForKey(recordsProcessed, hashCode);
}
+ public int getRecordNumForKey(int currentIndex) {
+ return hashTable.getRecordNumForKey(currentIndex);
+ }
+
+ public void setRecordNumForKey(int currentIndex, int num) {
+ hashTable.setRecordNumForKey(currentIndex, num);
+ }
+
+ public void decreaseRecordNumForKey(int currentIndex) {
+ hashTable.decreaseRecordNumForKey(currentIndex);
+ }
+
public Pair<Integer, Boolean> getStartIndex(int probeIndex) {
/* The current probe record has a key that matches. Get the index
* of the first row in the build side that matches the current key
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 1d9e2679ef..cdc4febbf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -98,6 +98,26 @@ public interface HashTable {
*/
int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException;
+ /**
+ * @param currentIndex The composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+ * @return Returns -1 if the count of records for a specific key is not computed. Otherwise returns
+ * the count of records for a specific key.
+ */
+ int getRecordNumForKey(int currentIndex);
+
+ /**
+ * Set the count of records for a specific key to num.
+ * @param currentIndex The composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+ * @param num The count of records for a specific key to be set.
+ */
+ void setRecordNumForKey(int currentIndex, int num);
+
+ /**
+ * Decrease the count of records for a specific key by one.
+ * @param currentIndex The composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+ */
+ void decreaseRecordNumForKey(int currentIndex);
+
void getStats(HashTableStats stats);
int size();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index fe329cc5e5..aaa2c5fcfb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -36,6 +36,7 @@ public class HashTableConfig {
private final List<NamedExpression> keyExprsProbe;
private final List<Comparator> comparators;
private final int joinControl;
+ private final boolean computeKeyNum;
public HashTableConfig(
int initialCapacity,
@@ -44,7 +45,7 @@ public class HashTableConfig {
List<NamedExpression> keyExprsProbe,
List<Comparator> comparators
) {
- this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT);
+ this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT, false);
}
@JsonCreator
@@ -54,7 +55,7 @@ public class HashTableConfig {
@JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
@JsonProperty("comparators") List<Comparator> comparators,
@JsonProperty("joinControl") int joinControl) {
- this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, joinControl);
+ this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, joinControl, false);
}
public HashTableConfig(
@@ -65,7 +66,7 @@ public class HashTableConfig {
List<NamedExpression> keyExprsProbe,
List<Comparator> comparators
) {
- this(initialCapacity, initialSizeIsFinal, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT);
+ this(initialCapacity, initialSizeIsFinal, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT, false);
}
@JsonCreator
@@ -75,7 +76,8 @@ public class HashTableConfig {
@JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
@JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
@JsonProperty("comparators") List<Comparator> comparators,
- @JsonProperty("joinControl") int joinControl
+ @JsonProperty("joinControl") int joinControl,
+ @JsonProperty("computeKeyNum") boolean computeKeyNum
) {
this.initialCapacity = initialCapacity;
this.initialSizeIsFinal = initialSizeIsFinal;
@@ -84,6 +86,7 @@ public class HashTableConfig {
this.keyExprsProbe = keyExprsProbe;
this.comparators = comparators;
this.joinControl = joinControl;
+ this.computeKeyNum = computeKeyNum;
}
public HashTableConfig withInitialCapacity(int initialCapacity) {
@@ -93,7 +96,8 @@ public class HashTableConfig {
keyExprsBuild,
keyExprsProbe,
comparators,
- JoinControl.DEFAULT
+ JoinControl.DEFAULT,
+ computeKeyNum
);
}
@@ -124,4 +128,8 @@ public class HashTableConfig {
public int getJoinControl() {
return joinControl;
}
+
+ public boolean isComputeKeyNum() {
+ return computeKeyNum;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 7d9e596148..da6e5e0861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -132,7 +132,7 @@ public abstract class HashTableTemplate implements HashTable {
// Array of 'link' values
private IntVector links;
-
+ private IntVector nums = null;
// Array of hash values - this is useful when resizing the hash table
private IntVector hashValues;
@@ -178,6 +178,9 @@ public abstract class HashTableTemplate implements HashTable {
links = allocMetadataVector(newBatchHolderSize, EMPTY_SLOT);
hashValues = allocMetadataVector(newBatchHolderSize, 0);
+ if (htConfig.isComputeKeyNum()) {
+ nums = allocMetadataVector(newBatchHolderSize, 0);
+ }
success = true;
} finally {
if (!success) {
@@ -185,6 +188,9 @@ public abstract class HashTableTemplate implements HashTable {
if (links != null) {
links.clear();
}
+ if (nums != null) {
+ nums.clear();
+ }
}
}
}
@@ -199,6 +205,12 @@ public abstract class HashTableTemplate implements HashTable {
}
links.getMutator().setValueCount(size);
hashValues.getMutator().setValueCount(size);
+ if (nums != null) {
+ for (int i = 0; i < size; i++) {
+ nums.getMutator().set(i, 0);
+ }
+ nums.getMutator().setValueCount(size);
+ }
}
protected void setup() throws SchemaChangeException {
@@ -242,6 +254,32 @@ public abstract class HashTableTemplate implements HashTable {
return links.getAccessor().get(currentIndex & BATCH_MASK);
}
+ private void increaseRecordNumForKey(int idxWithinBatch) {
+ if (nums != null) {
+ nums.getMutator().set(idxWithinBatch, nums.getAccessor().get(idxWithinBatch) + 1);
+ }
+ }
+
+ private void decreaseRecordNumForKey(int idxWithinBatch) {
+ if (nums != null) {
+ nums.getMutator().set(idxWithinBatch, nums.getAccessor().get(idxWithinBatch) - 1);
+ }
+ }
+
+ private int getRecordNumForKey(int idxWithinBatch) {
+ if (nums != null) {
+ return nums.getAccessor().get(idxWithinBatch);
+ } else {
+ return -1;
+ }
+ }
+
+ private void setNum(int idxWithinBatch, int num) {
+ if (nums != null) {
+ nums.getMutator().set(idxWithinBatch, num);
+ }
+ }
+
// Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
// container at the specified index
private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException {
@@ -259,6 +297,9 @@ public abstract class HashTableTemplate implements HashTable {
// will point to a null (empty) slot
links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT);
hashValues.getMutator().set(currentIdxWithinBatch, hashValue);
+ if (nums != null) {
+ nums.getMutator().set(currentIdxWithinBatch, 1);
+ }
maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
@@ -400,6 +441,7 @@ public abstract class HashTableTemplate implements HashTable {
htContainer.clear();
if ( links != null ) { links.clear(); }
if ( hashValues != null ) { hashValues.clear(); }
+ if ( nums != null ) { nums.clear(); }
}
// Only used for internal debugging. Get the value vector at a particular index from the htContainer.
@@ -686,6 +728,7 @@ public abstract class HashTableTemplate implements HashTable {
if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIndex, false)) {
htIdxHolder.value = currentIndex;
+ lastEntryBatch.increaseRecordNumForKey(lastEntryIdxWithinBatch);
return PutStatus.KEY_PRESENT;
}
}
@@ -767,6 +810,21 @@ public abstract class HashTableTemplate implements HashTable {
return -1;
}
+ public int getRecordNumForKey(int currentIndex) {
+ BatchHolder bh = batchHolders.get((currentIndex >>> 16) & BATCH_MASK);
+ return bh.getRecordNumForKey(currentIndex & BATCH_MASK);
+ }
+
+ public void setRecordNumForKey(int currentIndex, int num) {
+ BatchHolder bh = batchHolders.get((currentIndex >>> 16) & BATCH_MASK);
+ bh.setNum(currentIndex & BATCH_MASK, num);
+ }
+
+ public void decreaseRecordNumForKey(int currentIndex) {
+ BatchHolder bh = batchHolders.get((currentIndex >>> 16) & BATCH_MASK);
+ bh.decreaseRecordNumForKey(currentIndex & BATCH_MASK);
+ }
+
// Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
// currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
// the capacity, we will add a new BatchHolder. Return true if a new batch was added.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
similarity index 81%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
index a08e4308cd..bf9e3eee8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
@@ -17,31 +17,15 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.calcite.rel.core.JoinRelType;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -58,18 +42,16 @@ import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
-import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashPartition;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.SpilledState;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
-import org.apache.drill.exec.planner.common.JoinControl;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
@@ -77,7 +59,6 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
-import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
@@ -89,26 +70,37 @@ import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
/**
- * Implements the runtime execution for the Hash-Join operator supporting INNER,
- * LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
+ * Base class for the runtime execution implementation of the Hash-Join and
+ * Hash-SetOp operator
* <p>
* This implementation splits the incoming Build side rows into multiple
* Partitions, thus allowing spilling of some of these partitions to disk if
* memory gets tight. Each partition is implemented as a {@link HashPartition}.
- * After the build phase is over, in the most general case, some of the
- * partitions were spilled, and the others are in memory. Each of the partitions
+ * After the build phase is over, in the most general case, some partitions
+ * were spilled, and the others are in memory. Each of the partitions
* in memory would get a {@link HashTable} built.
* <p>
* Next the Probe side is read, and each row is key matched with a Build
* partition. If that partition is in memory, then the key is used to probe and
- * perform the join, and the results are added to the outgoing batch. But if
+ * perform the operation, and the results are added to the outgoing batch. But if
* that build side partition was spilled, then the matching Probe size partition
* is spilled as well.
* <p>
@@ -125,39 +117,35 @@ import org.slf4j.LoggerFactory;
* greater) is a waste, indicating that the number of partitions chosen was too
* small.
*/
-public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
- implements RowKeyJoin {
- private static final Logger logger = LoggerFactory
- .getLogger(HashJoinBatch.class);
+public abstract class AbstractHashBinaryRecordBatch<T extends PhysicalOperator> extends AbstractBinaryRecordBatch<T> {
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ protected boolean semiJoin;
+ protected boolean joinIsLeftOrFull;
+ protected boolean joinIsRightOrFull;
+ // Whether this HashJoin is used for a row-key based join
+ protected boolean isRowKeyJoin;
+ protected boolean enableRuntimeFilter;
+ protected RuntimeFilterDef runtimeFilterDef = null;
+ protected List<NamedExpression> rightExpr = Lists.newArrayList();
/**
- * The maximum number of records within each internal batch.
+ * Names of the join columns. This names are used in order to help estimate
+ * the size of the {@link HashTable}s.
*/
- private final int RECORDS_PER_BATCH; // internal batches
-
- // Join type, INNER, LEFT, RIGHT or OUTER
- private final JoinRelType joinType;
- private final boolean semiJoin;
- private final boolean joinIsLeftOrFull;
- private final boolean joinIsRightOrFull;
- private boolean skipHashTableBuild; // when outer side is empty, and the join
- // is inner or left (see DRILL-6755)
-
- // Join conditions
- private final List<JoinCondition> conditions;
-
- private RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
+ protected Set<String> buildJoinColumns = Sets.newHashSet();
- // Runtime generated class implementing HashJoinProbe interface
- private HashJoinProbe hashJoinProbe = null;
-
- private final List<NamedExpression> rightExpr;
+ protected boolean skipHashTableBuild; // when outer side is empty, and the join
+ // is inner or left (see DRILL-6755)
/**
- * Names of the join columns. This names are used in order to help estimate
- * the size of the {@link HashTable}s.
+ * The maximum number of records within each internal batch.
*/
- private final Set<String> buildJoinColumns;
+ protected final int RECORDS_PER_BATCH; // internal batches
+ protected RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
+
+ // Runtime generated class implementing Probe interface
+ protected Probe probe = null;
// Fields used for partitioning
/**
@@ -165,87 +153,81 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
* option and set in
* {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
- private int numPartitions = 1; // must be 2 to the power of bitsInMask
+ protected int numPartitions = 1; // must be 2 to the power of bitsInMask
/**
* The master class used to generate {@link HashTable}s.
*/
- private ChainedHashTable baseHashTable;
- private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
- private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
- private boolean canSpill = true;
- private boolean wasKilled; // a kill was received, may need to clean spilled
+ protected ChainedHashTable baseHashTable;
+ protected final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
+ protected final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
+ protected boolean canSpill = true;
+ protected boolean wasKilled; // a kill was received, may need to clean spilled
// partns
/**
* This array holds the currently active {@link HashPartition}s.
*/
- HashPartition partitions[];
+ protected HashPartition[] partitions;
// Number of records in the output container
- private int outputRecords;
+ protected int outputRecords;
// Schema of the build side
- private BatchSchema buildSchema;
+ protected BatchSchema buildSchema;
// Schema of the probe side
- private BatchSchema probeSchema;
-
- // Whether this HashJoin is used for a row-key based join
- private final boolean isRowKeyJoin;
-
- private final JoinControl joinControl;
+ protected BatchSchema probeSchema;
// An iterator over the build side hash table (only applicable for row-key
// joins)
- private boolean buildComplete;
+ protected boolean buildComplete;
// indicates if we have previously returned an output batch
- private boolean firstOutputBatch = true;
+ protected boolean firstOutputBatch = true;
- private int rightHVColPosition;
- private final BufferAllocator allocator;
+ protected int rightHVColPosition;
+ protected final BufferAllocator allocator;
// Local fields for left/right incoming - may be replaced when reading from
// spilled
- private RecordBatch buildBatch;
- private RecordBatch probeBatch;
+ protected RecordBatch buildBatch;
+ protected RecordBatch probeBatch;
/**
* Flag indicating whether or not the first data holding build batch needs to
* be fetched.
*/
- private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
+ protected final MutableBoolean prefetchedBuild = new MutableBoolean(false);
/**
* Flag indicating whether or not the first data holding probe batch needs to
* be fetched.
*/
- private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
+ protected final MutableBoolean prefetchedProbe = new MutableBoolean(false);
// For handling spilling
- private final SpillSet spillSet;
- HashJoinPOP popConfig;
+ protected final SpillSet spillSet;
+ T popConfig;
- private final int originalPartition = -1; // the partition a secondary reads
+ protected final int originalPartition = -1; // the partition a secondary reads
// from
IntVector read_right_HV_vector; // HV vector that was read from the spilled
// batch
- private final int maxBatchesInMemory;
- private final List<String> probeFields = new ArrayList<>(); // keep the same
+ protected final int maxBatchesInMemory;
+ protected final List<String> probeFields = new ArrayList<>(); // keep the same
// sequence with
// the
// bloomFilters
- private boolean enableRuntimeFilter;
- private RuntimeFilterReporter runtimeFilterReporter;
- private ValueVectorHashHelper.Hash64 hash64;
- private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
- private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
- private final List<BloomFilter> bloomFilters = new ArrayList<>();
- private boolean bloomFiltersGenerated;
+ protected RuntimeFilterReporter runtimeFilterReporter;
+ protected ValueVectorHashHelper.Hash64 hash64;
+ protected final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+ protected final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+ protected final List<BloomFilter> bloomFilters = new ArrayList<>();
+ protected boolean bloomFiltersGenerated;
/**
* This holds information about the spilled partitions for the build and probe
* side.
*/
- public static class HashJoinSpilledPartition
+ public static class SpilledPartition
extends AbstractSpilledPartitionMetadata {
private final int innerSpilledBatches;
private final String innerSpillFile;
@@ -253,7 +235,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
private String outerSpillFile;
private boolean updatedOuter;
- public HashJoinSpilledPartition(int cycle, int originPartition,
+ public SpilledPartition(int cycle, int originPartition,
int prevOriginPartition, int innerSpilledBatches,
String innerSpillFile) {
super(cycle, originPartition, prevOriginPartition);
@@ -297,20 +279,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
}
- public class HashJoinUpdater implements SpilledState.Updater {
+ public class Updater implements SpilledState.Updater {
@Override
public void cleanup() {
- HashJoinBatch.this.cleanup();
+ AbstractHashBinaryRecordBatch.this.cleanup();
}
@Override
public String getFailureMessage() {
- return "Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates).";
+ return this.getClass().getName() + " can not partition the inner data any further (probably due to too many join-key duplicates).";
}
@Override
public long getMemLimit() {
- return HashJoinBatch.this.allocator.getLimit();
+ return AbstractHashBinaryRecordBatch.this.allocator.getLimit();
}
@Override
@@ -322,9 +304,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
/**
* Queue of spilled partitions to process.
*/
- private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
- private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
- private HashJoinSpilledPartition spilledInners[]; // for the outer to find the
+ protected final SpilledState<SpilledPartition> spilledState = new SpilledState<>();
+ private final Updater spilledStateUpdater = new Updater();
+ protected SpilledPartition[] spilledInners; // for the outer to find the
// partition
public enum Metric implements MetricDef {
@@ -382,7 +364,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
setupHashTable();
}
- hashJoinProbe = setupHashJoinProbe();
+ probe = createProbe();
}
// If we have a valid schema, this will build a valid container.
@@ -408,7 +390,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
/**
- * Prefetches the first build side data holding batch.
+ * Prefetches the first probe side data holding batch.
*/
private void prefetchFirstProbeBatch() {
leftUpstream = prefetchFirstBatch(leftUpstream, prefetchedProbe,
@@ -570,8 +552,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
if (isRowKeyJoin) {
// discard the first left batch which was fetched by buildSchema, and
- // get the new
- // one based on rowkey join
+ // get the new one based on rowkey join
leftUpstream = next(left);
}
@@ -599,20 +580,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
// is data
if (state == BatchState.FIRST) {
- // Initialize various settings for the probe side
- hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType,
- semiJoin, leftUpstream, partitions, spilledState.getCycle(),
- container, spilledInners, buildSideIsEmpty.booleanValue(),
- numPartitions, rightHVColPosition);
+ setupProbe();
}
// Allocate the memory for the vectors in the output container
batchMemoryManager.allocateVectors(container);
- hashJoinProbe
- .setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+ probe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
- outputRecords = hashJoinProbe.probeAndProject();
+ outputRecords = probe.probeAndProject();
container.setValueCount(outputRecords);
@@ -647,7 +623,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
// skipping; see "continue" below
// Get the next (previously) spilled partition to handle as incoming
- HashJoinSpilledPartition currSp = spilledState
+ SpilledPartition currSp = spilledState
.getNextSpilledPartition();
// If the outer is empty (and it's not a right/full join) - try the
@@ -680,7 +656,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
probeBatch = left; // if no outer batch then reuse left - needed
// for updateIncoming()
leftUpstream = IterOutcome.NONE;
- hashJoinProbe.changeToFinalProbeState();
+ probe.changeToFinalProbeState();
}
spilledState.updateCycle(stats, currSp, spilledStateUpdater);
@@ -746,43 +722,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
private void setupHashTable() {
- List<Comparator> comparators = Lists
- .newArrayListWithExpectedSize(conditions.size());
- conditions.forEach(cond -> comparators
- .add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
-
if (skipHashTableBuild) {
return;
}
- // Setup the hash table configuration object
- List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
-
- // Create named expressions from the conditions
- for (int i = 0; i < conditions.size(); i++) {
- leftExpr.add(new NamedExpression(conditions.get(i).getLeft(),
- new FieldReference("probe_side_" + i)));
- }
-
- // Set the left named expression to be null if the probe batch is empty.
- if (leftUpstream != IterOutcome.OK_NEW_SCHEMA
- && leftUpstream != IterOutcome.OK) {
- leftExpr = null;
- } else {
- if (probeBatch.getSchema()
- .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- throw UserException.internalError(null).message(
- "Hash join does not support probe batch with selection vectors.")
- .addContext("Probe batch has selection mode",
- (probeBatch.getSchema().getSelectionVectorMode()).toString())
- .build(logger);
- }
- }
-
- HashTableConfig htConfig = new HashTableConfig(
- (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
- joinControl.asInt());
+ HashTableConfig htConfig = buildHashTableConfig();
// Create the chained hash table
baseHashTable = new ChainedHashTable(htConfig, context, allocator,
@@ -826,7 +770,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
enableRuntimeFilter = false;
return;
}
- RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
@@ -843,7 +786,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
if (missingField) {
logger.info(
- "As some build side join key fields not found, runtime filter was disabled");
+ "As some build side key fields not found, runtime filter was disabled");
enableRuntimeFilter = false;
return;
}
@@ -886,7 +829,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
spilledState.getCycle(), numPartitions);
}
- spilledInners = new HashJoinSpilledPartition[numPartitions];
+ spilledInners = new SpilledPartition[numPartitions];
}
@@ -900,7 +843,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
runtimeFilterReporter = new RuntimeFilterReporter(
(ExecutorFragmentContext) context);
- RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
// RuntimeFilter is not a necessary part of a HashJoin operator, only the
// query which satisfy the
// RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
@@ -922,7 +864,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
/**
- * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not
+ * Tunes the number of partitions used by {@link AbstractHashBinaryRecordBatch}. If it is not
* possible to spill it gives up and reverts to unbounded in memory operation.
*/
private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
@@ -1014,7 +956,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
* @return Returns an
* {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a
* termination condition is reached. Otherwise returns null.
- * @throws SchemaChangeException
+ * @throws SchemaChangeException schema change exception
*/
public IterOutcome executeBuildPhase() throws SchemaChangeException {
if (buildSideIsEmpty.booleanValue()) {
@@ -1084,7 +1026,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
case OK_NEW_SCHEMA:
if (!buildSchema.equals(buildBatch.getSchema())) {
throw SchemaChangeException.schemaChanged(
- "Hash join does not support schema changes in build side.",
+ this.getClass().getSimpleName() + " does not support schema changes in build side.",
buildSchema, buildBatch.getSchema());
}
for (HashPartition partn : partitions) {
@@ -1127,10 +1069,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
// HV column
int currPart = hashCode & spilledState.getPartitionMask();
hashCode >>>= spilledState.getBitsInMask();
- // semi-join skips join-key-duplicate rows
- if (semiJoin) {
-
- }
// Append the new inner row to the appropriate partition; spill (that
// partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
@@ -1153,7 +1091,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
if (bloomFilter2buildId.size() > 0) {
int hashJoinOpId = this.popConfig.getOperatorId();
runtimeFilterReporter.sendOut(bloomFilters, probeFields,
- this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
+ runtimeFilterDef, hashJoinOpId);
}
}
@@ -1213,7 +1151,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
for (HashPartition partn : partitions) {
if (partn.isSpilled()) {
- HashJoinSpilledPartition sp = new HashJoinSpilledPartition(
+ SpilledPartition sp = new SpilledPartition(
spilledState.getCycle(), partn.getPartitionNum(), originalPartition,
partn.getPartitionBatchesCount(), partn.getSpillFile());
@@ -1294,48 +1232,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
/**
* The constructor
*
- * @param popConfig
- * @param context
- * @param left
- * -- probe/outer side incoming input
- * @param right
- * -- build/iner side incoming input
- * @throws OutOfMemoryException
+ * @param popConfig T
+ * @param context FragmentContext
+ * @param left probe/outer side incoming input
+ * @param right build/iner side incoming input
+ * @throws OutOfMemoryException out of memory exception
*/
- public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
+ public AbstractHashBinaryRecordBatch(T popConfig, FragmentContext context,
RecordBatch left, /* Probe side record batch */
RecordBatch right /* Build side record batch */
) throws OutOfMemoryException {
super(popConfig, context, true, left, right);
this.buildBatch = right;
this.probeBatch = left;
- joinType = popConfig.getJoinType();
- semiJoin = popConfig.isSemiJoin();
- joinIsLeftOrFull = joinType == JoinRelType.LEFT
- || joinType == JoinRelType.FULL;
- joinIsRightOrFull = joinType == JoinRelType.RIGHT
- || joinType == JoinRelType.FULL;
- conditions = popConfig.getConditions();
this.popConfig = popConfig;
- this.isRowKeyJoin = popConfig.isRowKeyJoin();
- this.joinControl = new JoinControl(popConfig.getJoinControl());
-
- rightExpr = new ArrayList<>(conditions.size());
- buildJoinColumns = Sets.newHashSet();
- for (int i = 0; i < conditions.size(); i++) {
- SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
- }
-
- for (int i = 0; i < conditions.size(); i++) {
- SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
- PathSegment.NameSegment nameSegment = (PathSegment.NameSegment) rightPath
- .getLastSegment();
- buildJoinColumns.add(nameSegment.getPath());
- String refName = "build_side_" + i;
- rightExpr.add(new NamedExpression(conditions.get(i).getRight(),
- new FieldReference(refName)));
- }
-
this.allocator = oContext.getAllocator();
numPartitions = (int) context.getOptions()
@@ -1386,14 +1296,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
configuredBatchSize);
-
- enableRuntimeFilter = context.getOptions()
- .getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER)
- && popConfig.getRuntimeFilterDef() != null;
}
/**
- * This method is called when {@link HashJoinBatch} closes. It cleans up left
+ * This method is called when {@link AbstractHashBinaryRecordBatch} closes. It cleans up left
* over spilled files that are in the spill queue, and closes the spillSet.
*/
private void cleanup() {
@@ -1411,7 +1317,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
// delete any spill file left in unread spilled partitions
while (!spilledState.isEmpty()) {
- HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
+ SpilledPartition sp = spilledState.getNextSpilledPartition();
try {
spillSet.delete(sp.innerSpillFile);
} catch (IOException e) {
@@ -1488,55 +1394,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
- /**
- * Get the hash table iterator that is created for the build side of the hash
- * join if this hash join was instantiated as a row-key join.
- *
- * @return hash table iterator or null if this hash join was not a row-key
- * join or if it was a row-key join but the build has not yet
- * completed.
- */
- @Override
- public Pair<ValueVector, Integer> nextRowKeyBatch() {
- if (buildComplete) {
- // partition 0 because Row Key Join has only a single partition - no
- // spilling
- Pair<VectorContainer, Integer> pp = partitions[0].nextBatch();
- if (pp != null) {
- VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0);
- ValueVector vv = vw.getValueVector();
- return Pair.of(vv, pp.getRight());
- }
- } else if (partitions == null && firstOutputBatch) { // if there is data
- // coming to
- // right(build) side in
- // build Schema stage,
- // use it.
- firstOutputBatch = false;
- if (right.getRecordCount() > 0) {
- VectorWrapper<?> vw = Iterables.get(right, 0);
- ValueVector vv = vw.getValueVector();
- return Pair.of(vv, right.getRecordCount() - 1);
- }
- }
- return null;
- }
-
- @Override // implement RowKeyJoin interface
- public boolean hasRowKeyBatch() {
- return buildComplete;
- }
-
- @Override // implement RowKeyJoin interface
- public BatchState getBatchState() {
- return state;
- }
-
- @Override // implement RowKeyJoin interface
- public void setBatchState(BatchState newState) {
- state = newState;
- }
-
@Override
protected void cancelIncoming() {
wasKilled = true;
@@ -1545,44 +1402,34 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
public void updateMetrics() {
- stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_INPUT_BATCH_COUNT,
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_INPUT_RECORD_COUNT,
batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.OUTPUT_BATCH_COUNT,
batchMemoryManager.getNumOutgoingBatches());
- stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
batchMemoryManager.getAvgOutputBatchSize());
- stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.AVG_OUTPUT_ROW_BYTES,
batchMemoryManager.getAvgOutputRowWidth());
- stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
+ stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.OUTPUT_RECORD_COUNT,
batchMemoryManager.getTotalOutputRecords());
}
- @Override
- public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) {
- this.rkJoinState = newState;
- }
-
- @Override
- public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() {
- return rkJoinState;
- }
-
@Override
public void close() {
if (!spilledState.isFirstCycle()) { // spilling happened
@@ -1628,17 +1475,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
super.close();
}
- public HashJoinProbe setupHashJoinProbe() {
- // No real code generation !!
- return new HashJoinProbeTemplate();
- }
+ public abstract Probe createProbe();
+ public abstract void setupProbe() throws SchemaChangeException;
- @Override
- public void dump() {
- logger.error(
- "HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={},"
- + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]",
- container, left, right, leftUpstream, rightUpstream, joinType,
- hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
- }
+ protected abstract HashTableConfig buildHashTableConfig();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index a08e4308cd..60de29091f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,743 +17,106 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
-import org.apache.drill.exec.memory.BaseAllocator;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
-import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
-import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.Comparator;
-import org.apache.drill.exec.physical.impl.common.HashPartition;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import org.apache.drill.exec.physical.impl.common.SpilledState;
-import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.planner.common.JoinControl;
-import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.JoinBatchMemoryManager;
-import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.record.RecordBatchStats;
-import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
-import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.drill.exec.work.filter.BloomFilter;
-import org.apache.drill.exec.work.filter.BloomFilterDef;
-import org.apache.drill.exec.work.filter.RuntimeFilterDef;
-import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
/**
* Implements the runtime execution for the Hash-Join operator supporting INNER,
* LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
- * <p>
- * This implementation splits the incoming Build side rows into multiple
- * Partitions, thus allowing spilling of some of these partitions to disk if
- * memory gets tight. Each partition is implemented as a {@link HashPartition}.
- * After the build phase is over, in the most general case, some of the
- * partitions were spilled, and the others are in memory. Each of the partitions
- * in memory would get a {@link HashTable} built.
- * <p>
- * Next the Probe side is read, and each row is key matched with a Build
- * partition. If that partition is in memory, then the key is used to probe and
- * perform the join, and the results are added to the outgoing batch. But if
- * that build side partition was spilled, then the matching Probe size partition
- * is spilled as well.
- * <p>
- * After all the Probe side was processed, we are left with pairs of spilled
- * partitions. Then each pair is processed individually (that Build partition
- * should be smaller than the original, hence likely fit whole into memory to
- * allow probing; if not -- see below).
- * <p>
- * Processing of each spilled pair is EXACTLY like processing the original
- * Build/Probe incomings. (As a fact, the {@link #innerNext()} method calls
- * itself recursively !!). Thus the spilled build partition is read and divided
- * into new partitions, which in turn may spill again (and again...). The code
- * tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or
- * greater) is a waste, indicating that the number of partitions chosen was too
- * small.
*/
-public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
+public class HashJoinBatch extends AbstractHashBinaryRecordBatch<HashJoinPOP>
implements RowKeyJoin {
- private static final Logger logger = LoggerFactory
- .getLogger(HashJoinBatch.class);
-
- /**
- * The maximum number of records within each internal batch.
- */
- private final int RECORDS_PER_BATCH; // internal batches
-
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
- private final boolean semiJoin;
- private final boolean joinIsLeftOrFull;
- private final boolean joinIsRightOrFull;
- private boolean skipHashTableBuild; // when outer side is empty, and the join
- // is inner or left (see DRILL-6755)
-
// Join conditions
private final List<JoinCondition> conditions;
-
- private RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
-
- // Runtime generated class implementing HashJoinProbe interface
- private HashJoinProbe hashJoinProbe = null;
-
- private final List<NamedExpression> rightExpr;
-
- /**
- * Names of the join columns. This names are used in order to help estimate
- * the size of the {@link HashTable}s.
- */
- private final Set<String> buildJoinColumns;
-
- // Fields used for partitioning
- /**
- * The number of {@link HashPartition}s. This is configured via a system
- * option and set in
- * {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
- */
- private int numPartitions = 1; // must be 2 to the power of bitsInMask
-
- /**
- * The master class used to generate {@link HashTable}s.
- */
- private ChainedHashTable baseHashTable;
- private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
- private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
- private boolean canSpill = true;
- private boolean wasKilled; // a kill was received, may need to clean spilled
- // partns
-
- /**
- * This array holds the currently active {@link HashPartition}s.
- */
- HashPartition partitions[];
-
- // Number of records in the output container
- private int outputRecords;
-
- // Schema of the build side
- private BatchSchema buildSchema;
- // Schema of the probe side
- private BatchSchema probeSchema;
-
- // Whether this HashJoin is used for a row-key based join
- private final boolean isRowKeyJoin;
-
private final JoinControl joinControl;
- // An iterator over the build side hash table (only applicable for row-key
- // joins)
- private boolean buildComplete;
-
- // indicates if we have previously returned an output batch
- private boolean firstOutputBatch = true;
-
- private int rightHVColPosition;
- private final BufferAllocator allocator;
- // Local fields for left/right incoming - may be replaced when reading from
- // spilled
- private RecordBatch buildBatch;
- private RecordBatch probeBatch;
-
- /**
- * Flag indicating whether or not the first data holding build batch needs to
- * be fetched.
- */
- private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
- /**
- * Flag indicating whether or not the first data holding probe batch needs to
- * be fetched.
- */
- private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
-
- // For handling spilling
- private final SpillSet spillSet;
- HashJoinPOP popConfig;
-
- private final int originalPartition = -1; // the partition a secondary reads
- // from
- IntVector read_right_HV_vector; // HV vector that was read from the spilled
- // batch
- private final int maxBatchesInMemory;
- private final List<String> probeFields = new ArrayList<>(); // keep the same
- // sequence with
- // the
- // bloomFilters
- private boolean enableRuntimeFilter;
- private RuntimeFilterReporter runtimeFilterReporter;
- private ValueVectorHashHelper.Hash64 hash64;
- private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
- private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
- private final List<BloomFilter> bloomFilters = new ArrayList<>();
- private boolean bloomFiltersGenerated;
-
/**
- * This holds information about the spilled partitions for the build and probe
- * side.
+ * The constructor
+ *
+ * @param popConfig HashJoinPOP
+ * @param context FragmentContext
+ * @param left probe/outer side incoming input
+ * @param right build/iner side incoming input
+ * @throws OutOfMemoryException out of memory exception
*/
- public static class HashJoinSpilledPartition
- extends AbstractSpilledPartitionMetadata {
- private final int innerSpilledBatches;
- private final String innerSpillFile;
- private int outerSpilledBatches;
- private String outerSpillFile;
- private boolean updatedOuter;
-
- public HashJoinSpilledPartition(int cycle, int originPartition,
- int prevOriginPartition, int innerSpilledBatches,
- String innerSpillFile) {
- super(cycle, originPartition, prevOriginPartition);
-
- this.innerSpilledBatches = innerSpilledBatches;
- this.innerSpillFile = innerSpillFile;
- }
-
- public int getInnerSpilledBatches() {
- return innerSpilledBatches;
- }
-
- public String getInnerSpillFile() {
- return innerSpillFile;
- }
-
- public int getOuterSpilledBatches() {
- Preconditions.checkState(updatedOuter);
- return outerSpilledBatches;
- }
-
- public String getOuterSpillFile() {
- Preconditions.checkState(updatedOuter);
- return outerSpillFile;
- }
-
- public void updateOuter(int outerSpilledBatches, String outerSpillFile) {
- Preconditions.checkState(!updatedOuter);
- updatedOuter = true;
-
- this.outerSpilledBatches = outerSpilledBatches;
- this.outerSpillFile = outerSpillFile;
- }
-
- @Override
- public String makeDebugString() {
- return String.format(
- "Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).",
- this.getOriginPartition(), this.getPrevOriginPartition(),
- this.getCycle(), outerSpilledBatches, innerSpilledBatches);
- }
- }
-
- public class HashJoinUpdater implements SpilledState.Updater {
- @Override
- public void cleanup() {
- HashJoinBatch.this.cleanup();
- }
+ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
+ RecordBatch left, /* Probe side record batch */
+ RecordBatch right /* Build side record batch */
+ ) throws OutOfMemoryException {
+ super(popConfig, context, left, right);
+ joinType = popConfig.getJoinType();
+ conditions = popConfig.getConditions();
+ this.joinControl = new JoinControl(popConfig.getJoinControl());
- @Override
- public String getFailureMessage() {
- return "Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates).";
- }
+ semiJoin = popConfig.isSemiJoin();
+ joinIsLeftOrFull = joinType == JoinRelType.LEFT
+ || joinType == JoinRelType.FULL;
+ joinIsRightOrFull = joinType == JoinRelType.RIGHT
+ || joinType == JoinRelType.FULL;
+ this.isRowKeyJoin = popConfig.isRowKeyJoin();
- @Override
- public long getMemLimit() {
- return HashJoinBatch.this.allocator.getLimit();
+ for (int i = 0; i < conditions.size(); i++) {
+ SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+ PathSegment.NameSegment nameSegment = (PathSegment.NameSegment) rightPath
+ .getLastSegment();
+ buildJoinColumns.add(nameSegment.getPath());
+ String refName = "build_side_" + i;
+ rightExpr.add(new NamedExpression(conditions.get(i).getRight(),
+ new FieldReference(refName)));
}
- @Override
- public boolean hasPartitionLimit() {
- return true;
- }
- }
-
- /**
- * Queue of spilled partitions to process.
- */
- private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
- private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
- private HashJoinSpilledPartition spilledInners[]; // for the outer to find the
- // partition
-
- public enum Metric implements MetricDef {
- NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, RESIZING_TIME_MS, NUM_PARTITIONS,
- // number of original partitions spilled to disk
- SPILLED_PARTITIONS,
- SPILL_MB, // Number of MB of data spilled to disk. This amount is first
- // written,
- // then later re-read. So, disk I/O is twice this amount.
- SPILL_CYCLE, // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
- LEFT_INPUT_BATCH_COUNT, LEFT_AVG_INPUT_BATCH_BYTES, LEFT_AVG_INPUT_ROW_BYTES,
- LEFT_INPUT_RECORD_COUNT, RIGHT_INPUT_BATCH_COUNT, RIGHT_AVG_INPUT_BATCH_BYTES,
- RIGHT_AVG_INPUT_ROW_BYTES, RIGHT_INPUT_RECORD_COUNT, OUTPUT_BATCH_COUNT,
- AVG_OUTPUT_BATCH_BYTES, AVG_OUTPUT_ROW_BYTES, OUTPUT_RECORD_COUNT;
-
- // duplicate for hash ag
-
- @Override
- public int metricId() {
- return ordinal();
- }
+ runtimeFilterDef = popConfig.getRuntimeFilterDef();
+ enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER)
+ && runtimeFilterDef != null;
}
@Override
- public int getRecordCount() {
- return outputRecords;
+ public Probe createProbe() {
+ // No real code generation !!
+ return new HashJoinProbeTemplate();
}
@Override
- protected void buildSchema() {
- // We must first get the schemas from upstream operators before we can build
- // our schema.
- boolean validSchema = prefetchFirstBatchFromBothSides();
-
- if (validSchema) {
- // We are able to construct a valid schema from the upstream data.
- // Setting the state here makes sure AbstractRecordBatch returns
- // OK_NEW_SCHEMA
- state = BatchState.BUILD_SCHEMA;
-
- if (leftUpstream == OK_NEW_SCHEMA) {
- probeSchema = left.getSchema();
- }
-
- if (rightUpstream == OK_NEW_SCHEMA) {
- buildSchema = right.getSchema();
- // position of the new "column" for keeping the hash values
- // (after the real columns)
- rightHVColPosition = right.getContainer().getNumberOfColumns();
- // In special cases, when the probe side is empty, and
- // inner/left join - no need for Hash Table
- skipHashTableBuild = leftUpstream == IterOutcome.NONE
- && !joinIsRightOrFull;
- // We only need the hash tables if we have data on the build side.
- setupHashTable();
- }
-
- hashJoinProbe = setupHashJoinProbe();
- }
-
- // If we have a valid schema, this will build a valid container.
- // If we were unable to obtain a valid schema,
- // we still need to build a dummy schema. This code handles both cases for
- // us.
- setupOutputContainerSchema();
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- container.setEmpty();
- }
-
- /**
- * Prefetches the first build side data holding batch.
- */
- private void prefetchFirstBuildBatch() {
- rightUpstream = prefetchFirstBatch(rightUpstream, prefetchedBuild,
- buildSideIsEmpty, RIGHT_INDEX, buildBatch, () -> {
- batchMemoryManager.update(RIGHT_INDEX, 0, true);
- RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
- batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX),
- getRecordBatchStatsContext());
- });
- }
-
- /**
- * Prefetches the first build side data holding batch.
- */
- private void prefetchFirstProbeBatch() {
- leftUpstream = prefetchFirstBatch(leftUpstream, prefetchedProbe,
- probeSideIsEmpty, LEFT_INDEX, probeBatch, () -> {
- batchMemoryManager.update(LEFT_INDEX, 0);
- RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
- batchMemoryManager.getRecordBatchSizer(LEFT_INDEX),
- getRecordBatchStatsContext());
- });
- }
-
- /**
- * Used to fetch the first data holding batch from either the build or probe
- * side.
- *
- * @param outcome
- * The current upstream outcome for either the build or probe side.
- * @param prefetched
- * A flag indicating if we have already done a prefetch of the first
- * data holding batch for the probe or build side.
- * @param isEmpty
- * A flag indicating if the probe or build side is empty.
- * @param index
- * The upstream index of the probe or build batch.
- * @param batch
- * The probe or build batch itself.
- * @param memoryManagerUpdate
- * A lambda function to execute the memory manager update for the
- * probe or build batch.
- * @return The current
- * {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
- */
- private IterOutcome prefetchFirstBatch(IterOutcome outcome,
- MutableBoolean prefetched, MutableBoolean isEmpty, int index,
- RecordBatch batch, Runnable memoryManagerUpdate) {
- if (prefetched.booleanValue()) {
- // We have already prefetch the first data holding batch
- return outcome;
- }
-
- // If we didn't retrieve our first data holding batch, we need to do it now.
- prefetched.setValue(true);
-
- if (outcome != IterOutcome.NONE) {
- // We can only get data if there is data available
- outcome = sniffNonEmptyBatch(outcome, index, batch);
- }
-
- isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there
- // is no data.
-
- // Got our first batch(es)
- if (spilledState.isFirstCycle()) {
- // Only collect stats for the first cycle
- memoryManagerUpdate.run();
- }
- state = BatchState.FIRST;
- return outcome;
- }
-
- /**
- * Currently in order to accurately predict memory usage for spilling, the
- * first non-empty build or probe side batch is needed. This method fetches
- * the first non-empty batch from the probe or build side.
- *
- * @param curr
- * The current outcome.
- * @param inputIndex
- * Index specifying whether to work with the prorbe or build input.
- * @param recordBatch
- * The probe or build record batch.
- * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}
- * for the left or right record batch.
- */
- private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex,
- RecordBatch recordBatch) {
- while (true) {
- if (recordBatch.getRecordCount() != 0) {
- return curr;
- }
-
- curr = next(inputIndex, recordBatch);
-
- switch (curr) {
- case OK:
- // We got a data batch
- break;
- case NOT_YET:
- // We need to try again
- break;
- case EMIT:
- throw new UnsupportedOperationException("We do not support " + EMIT);
- default:
- // Other cases are termination conditions
- return curr;
- }
- }
- }
-
- /**
- * Determines the memory calculator to use. If maxNumBatches is configured
- * simple batch counting is used to spill. Otherwise memory calculations are
- * used to determine when to spill.
- *
- * @return The memory calculator to use.
- */
- public HashJoinMemoryCalculator getCalculatorImpl() {
- if (maxBatchesInMemory == 0) {
- double safetyFactor = context.getOptions()
- .getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
- double fragmentationFactor = context.getOptions()
- .getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
- double hashTableDoublingFactor = context.getOptions()
- .getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
- String hashTableCalculatorType = context.getOptions()
- .getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
-
- return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor,
- hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
- } else {
- return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
- }
+ public void setupProbe() throws SchemaChangeException {
+ probe.setup(probeBatch, this, joinType,
+ semiJoin, leftUpstream, partitions, spilledState.getCycle(),
+ container, spilledInners, buildSideIsEmpty.booleanValue(),
+ numPartitions, rightHVColPosition);
}
@Override
- public IterOutcome innerNext() {
- if (wasKilled) {
- // We have received a cancel signal. We need to stop processing.
- cleanup();
- return IterOutcome.NONE;
- }
-
- prefetchFirstBuildBatch();
-
- if (rightUpstream.isError()) {
- // A termination condition was reached while prefetching the first build
- // side data holding batch.
- // We need to terminate.
- return rightUpstream;
- }
-
- try {
- /*
- * If we are here for the first time, execute the build phase of the hash
- * join and setup the run time generated class for the probe side
- */
- if (state == BatchState.FIRST) {
- // Build the hash table, using the build side record batches.
- IterOutcome buildExecuteTermination = executeBuildPhase();
-
- if (buildExecuteTermination != null) {
- // A termination condition was reached while executing the build
- // phase.
- // We need to terminate.
- return buildExecuteTermination;
- }
-
- buildComplete = true;
-
- if (isRowKeyJoin) {
- // discard the first left batch which was fetched by buildSchema, and
- // get the new
- // one based on rowkey join
- leftUpstream = next(left);
- }
-
- // Update the hash table related stats for the operator
- updateStats();
- }
-
- // Try to probe and project, or recursively handle a spilled partition
- if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
- joinIsLeftOrFull) { // or if this is a left/full outer join
-
- prefetchFirstProbeBatch();
-
- if (leftUpstream.isError()
- || (leftUpstream == NONE && !joinIsRightOrFull)) {
- // A termination condition was reached while prefetching the first
- // probe side data holding batch.
- // We need to terminate.
- return leftUpstream;
- }
-
- if (!buildSideIsEmpty.booleanValue()
- || !probeSideIsEmpty.booleanValue()) {
- // Only allocate outgoing vectors and execute probing logic if there
- // is data
-
- if (state == BatchState.FIRST) {
- // Initialize various settings for the probe side
- hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType,
- semiJoin, leftUpstream, partitions, spilledState.getCycle(),
- container, spilledInners, buildSideIsEmpty.booleanValue(),
- numPartitions, rightHVColPosition);
- }
-
- // Allocate the memory for the vectors in the output container
- batchMemoryManager.allocateVectors(container);
-
- hashJoinProbe
- .setTargetOutputCount(batchMemoryManager.getOutputRowCount());
-
- outputRecords = hashJoinProbe.probeAndProject();
-
- container.setValueCount(outputRecords);
-
- batchMemoryManager.updateOutgoingStats(outputRecords);
- RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this,
- getRecordBatchStatsContext());
-
- /*
- * We are here because of one the following 1. Completed processing of
- * all the records and we are done 2. We've filled up the outgoing
- * batch to the maximum and we need to return upstream Either case
- * build the output container's schema and return
- */
- if (outputRecords > 0 || state == BatchState.FIRST) {
- state = BatchState.NOT_FIRST;
-
- return IterOutcome.OK;
- }
- }
-
- // Free all partitions' in-memory data structures
- // (In case need to start processing spilled partitions)
- for (HashPartition partn : partitions) {
- partn.cleanup(false); // clean, but do not delete the spill files !!
- }
-
- //
- // (recursively) Handle the spilled partitions, if any
- //
- if (!buildSideIsEmpty.booleanValue()) {
- while (!spilledState.isEmpty()) { // "while" is only used for
- // skipping; see "continue" below
-
- // Get the next (previously) spilled partition to handle as incoming
- HashJoinSpilledPartition currSp = spilledState
- .getNextSpilledPartition();
-
- // If the outer is empty (and it's not a right/full join) - try the
- // next spilled partition
- if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
- continue;
- }
-
- // Create a BUILD-side "incoming" out of the inner spill file of
- // that partition
- buildBatch = new SpilledRecordBatch(currSp.innerSpillFile,
- currSp.innerSpilledBatches, context, buildSchema, oContext,
- spillSet);
- // The above ctor call also got the first batch; need to update the
- // outcome
- rightUpstream = ((SpilledRecordBatch) buildBatch)
- .getInitialOutcome();
-
- if (currSp.outerSpilledBatches > 0) {
- // Create a PROBE-side "incoming" out of the outer spill file of
- // that partition
- probeBatch = new SpilledRecordBatch(currSp.outerSpillFile,
- currSp.outerSpilledBatches, context, probeSchema, oContext,
- spillSet);
- // The above ctor call also got the first batch; need to update
- // the outcome
- leftUpstream = ((SpilledRecordBatch) probeBatch)
- .getInitialOutcome();
- } else {
- probeBatch = left; // if no outer batch then reuse left - needed
- // for updateIncoming()
- leftUpstream = IterOutcome.NONE;
- hashJoinProbe.changeToFinalProbeState();
- }
-
- spilledState.updateCycle(stats, currSp, spilledStateUpdater);
- state = BatchState.FIRST; // TODO need to determine if this is still
- // necessary since
- // prefetchFirstBatchFromBothSides sets
- // this
-
- prefetchedBuild.setValue(false);
- prefetchedProbe.setValue(false);
-
- return innerNext(); // start processing the next spilled partition
- // "recursively"
- }
- }
-
- } else {
- // Our build side is empty, we won't have any matches, clear the probe
- // side
- killAndDrainLeftUpstream();
- }
-
- // No more output records, clean up and return
- state = BatchState.DONE;
-
- cleanup();
-
- return IterOutcome.NONE;
- } catch (SchemaChangeException e) {
- throw UserException.schemaChangeError(e).build(logger);
- }
- }
-
- /**
- * In case an upstream data is no longer needed, send a kill and flush any
- * remaining batch
- *
- * @param batch
- * probe or build batch
- * @param upstream
- * which upstream
- * @param isLeft
- * is it the left or right
- */
- private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream,
- boolean isLeft) {
- batch.cancel();
- while (upstream == IterOutcome.OK_NEW_SCHEMA
- || upstream == IterOutcome.OK) {
- VectorAccessibleUtilities.clear(batch);
- upstream = next(
- isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT,
- batch);
- }
- }
-
- private void killAndDrainLeftUpstream() {
- killAndDrainUpstream(probeBatch, leftUpstream, true);
- }
-
- private void killAndDrainRightUpstream() {
- killAndDrainUpstream(buildBatch, rightUpstream, false);
- }
-
- private void setupHashTable() {
+ protected HashTableConfig buildHashTableConfig() {
List<Comparator> comparators = Lists
- .newArrayListWithExpectedSize(conditions.size());
+ .newArrayListWithExpectedSize(conditions.size());
conditions.forEach(cond -> comparators
- .add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
-
- if (skipHashTableBuild) {
- return;
- }
+ .add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
// Setup the hash table configuration object
List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -761,731 +124,62 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
// Create named expressions from the conditions
for (int i = 0; i < conditions.size(); i++) {
leftExpr.add(new NamedExpression(conditions.get(i).getLeft(),
- new FieldReference("probe_side_" + i)));
+ new FieldReference("probe_side_" + i)));
}
// Set the left named expression to be null if the probe batch is empty.
if (leftUpstream != IterOutcome.OK_NEW_SCHEMA
- && leftUpstream != IterOutcome.OK) {
+ && leftUpstream != IterOutcome.OK) {
leftExpr = null;
} else {
if (probeBatch.getSchema()
- .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+ .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
throw UserException.internalError(null).message(
"Hash join does not support probe batch with selection vectors.")
- .addContext("Probe batch has selection mode",
- (probeBatch.getSchema().getSelectionVectorMode()).toString())
- .build(logger);
- }
- }
-
- HashTableConfig htConfig = new HashTableConfig(
- (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
- joinControl.asInt());
-
- // Create the chained hash table
- baseHashTable = new ChainedHashTable(htConfig, context, allocator,
- buildBatch, probeBatch, null);
- if (enableRuntimeFilter) {
- setupHash64(htConfig);
- }
- }
-
- private void setupHash64(HashTableConfig htConfig) {
- LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig
- .getKeyExprsBuild().size()];
- ErrorCollector collector = new ErrorCollectorImpl();
- int i = 0;
- for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
- LogicalExpression expr = ExpressionTreeMaterializer.materialize(
- ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
- collector.reportErrors(logger);
- if (expr == null) {
- continue;
- }
- keyExprsBuild[i] = expr;
- i++;
- }
- i = 0;
- boolean missingField = false;
- TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length];
- for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
- SchemaPath schemaPath = (SchemaPath) ne.getExpr();
- TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
- if (typedFieldId == null) {
- missingField = true;
- break;
- }
- buildSideTypeFieldIds[i] = typedFieldId;
- i++;
- }
- if (missingField) {
- logger.info(
- "As some build side key fields not found, runtime filter was disabled");
- enableRuntimeFilter = false;
- return;
- }
- RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
- List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
- .getBloomFilterDefs();
- for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
- String buildField = bloomFilterDef.getBuildField();
- SchemaPath schemaPath = new SchemaPath(
- new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
- TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
- if (typedFieldId == null) {
- missingField = true;
- break;
- }
- int fieldId = typedFieldId.getFieldIds()[0];
- bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
- }
- if (missingField) {
- logger.info(
- "As some build side join key fields not found, runtime filter was disabled");
- enableRuntimeFilter = false;
- return;
- }
- ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch,
- context);
- try {
- hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
- } catch (Exception e) {
- throw UserException.internalError(e)
- .message("Failed to construct a field's hash64 dynamic codes")
+ .addContext("Probe batch has selection mode",
+ (probeBatch.getSchema().getSelectionVectorMode()).toString())
.build(logger);
- }
- }
-
- /**
- * Call only after num partitions is known
- */
- private void delayedSetup() {
- //
- // Find out the estimated max batch size, etc
- // and compute the max numPartitions possible
- // See partitionNumTuning()
- //
-
- spilledState.initialize(numPartitions);
- // Create array for the partitions
- partitions = new HashPartition[numPartitions];
- }
-
- /**
- * Initialize fields (that may be reused when reading spilled partitions)
- */
- private void initializeBuild() {
- baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process
- // the spilled files
- // Recreate the partitions every time build is initialized
- for (int part = 0; part < numPartitions; part++) {
- partitions[part] = new HashPartition(context, allocator, baseHashTable,
- buildBatch, probeBatch, semiJoin, RECORDS_PER_BATCH, spillSet, part,
- spilledState.getCycle(), numPartitions);
- }
-
- spilledInners = new HashJoinSpilledPartition[numPartitions];
-
- }
-
- /**
- * Note: This method can not be called again as part of recursive call of
- * executeBuildPhase() to handle spilled build partitions.
- */
- private void initializeRuntimeFilter() {
- if (!enableRuntimeFilter || bloomFiltersGenerated) {
- return;
- }
- runtimeFilterReporter = new RuntimeFilterReporter(
- (ExecutorFragmentContext) context);
- RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
- // RuntimeFilter is not a necessary part of a HashJoin operator, only the
- // query which satisfy the
- // RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
- if (runtimeFilterDef != null) {
- List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
- .getBloomFilterDefs();
- for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
- int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
- int numBytes = bloomFilterDef.getNumBytes();
- String probeField = bloomFilterDef.getProbeField();
- probeFields.add(probeField);
- BloomFilter bloomFilter = new BloomFilter(numBytes,
- context.getAllocator());
- bloomFilters.add(bloomFilter);
- bloomFilter2buildId.put(bloomFilter, buildFieldId);
}
}
- bloomFiltersGenerated = true;
- }
- /**
- * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not
- * possible to spill it gives up and reverts to unbounded in memory operation.
- */
- private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
- int maxBatchSize,
- HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
- // Get auto tuning result
- numPartitions = buildCalc.getNumPartitions();
-
- if (logger.isDebugEnabled()) {
- logger.debug(buildCalc.makeDebugString());
- }
-
- if (buildCalc.getMaxReservedMemory() > allocator.getLimit()) {
- // We don't have enough memory to do any spilling. Give up and do no
- // spilling and have no limits
-
- // TODO dirty hack to prevent regressions. Remove this once batch sizing
- // is implemented.
- // We don't have enough memory to do partitioning, we have to do
- // everything in memory
- String message = String.format(
- "When using the minimum number of partitions %d we require %s memory but only have %s available. "
- + "Forcing legacy behavior of using unbounded memory in order to prevent regressions.",
- numPartitions,
- FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
- FileUtils.byteCountToDisplaySize(allocator.getLimit()));
- logger.warn(message);
-
- // create a Noop memory calculator
- HashJoinMemoryCalculator calc = getCalculatorImpl();
- calc.initialize(false);
- buildCalc = calc.next();
-
- buildCalc.initialize(true, true, // TODO Fix after growing hash values bug
- // fixed
- buildBatch, probeBatch, buildJoinColumns,
- leftUpstream == IterOutcome.NONE, // probeEmpty
- allocator.getLimit(), numPartitions, RECORDS_PER_BATCH,
- RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
- batchMemoryManager.getOutputBatchSize(),
- HashTable.DEFAULT_LOAD_FACTOR);
-
- disableSpilling(null);
- }
-
- return buildCalc;
- }
-
- /**
- * Disable spilling - use only a single partition and set the memory limit to
- * the max ( 10GB )
- *
- * @param reason
- * If not null - log this as warning, else check fallback setting to
- * either warn or fail.
- */
- private void disableSpilling(String reason) {
- // Fail, or just issue a warning if a reason was given, or a fallback option
- // is enabled
- if (reason == null) {
- boolean fallbackEnabled = context.getOptions()
- .getBoolean(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY);
- if (fallbackEnabled) {
- logger.warn(
- "Spilling is disabled - not enough memory available for internal partitioning. Falling back"
- + " to use unbounded memory");
- } else {
- throw UserException.resourceError().message(String.format(
- "Not enough memory for internal partitioning and fallback mechanism for "
- + "HashJoin to use unbounded memory is disabled.\n" +
- "Either enable fallback option %s using ALTER "
- + "SESSION/SYSTEM command or increase the memory limit for the Drillbit",
- ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
- }
- } else {
- logger.warn(reason);
- }
-
- numPartitions = 1; // We are only using one partition
- canSpill = false; // We cannot spill
- allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and
- // force unbounded memory
+ return new HashTableConfig(
+ (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+ true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
+ joinControl.asInt(), false);
}
- /**
- * Execute the BUILD phase; first read incoming and split rows into
- * partitions; may decide to spill some of the partitions
- *
- * @return Returns an
- * {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a
- * termination condition is reached. Otherwise returns null.
- * @throws SchemaChangeException
- */
- public IterOutcome executeBuildPhase() throws SchemaChangeException {
- if (buildSideIsEmpty.booleanValue()) {
- // empty right
- return null;
- }
-
- if (skipHashTableBuild) { // No hash table needed - then consume all the
- // right upstream
- killAndDrainRightUpstream();
- return null;
- }
-
- HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
-
- {
- // Initializing build calculator
- // Limit scope of these variables to this block
- int maxBatchSize = spilledState.isFirstCycle()
- ? RecordBatch.MAX_BATCH_ROW_COUNT
- : RECORDS_PER_BATCH;
- boolean doMemoryCalculation = canSpill
- && !probeSideIsEmpty.booleanValue();
- HashJoinMemoryCalculator calc = getCalculatorImpl();
-
- calc.initialize(doMemoryCalculation);
- buildCalc = calc.next();
-
- buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after
- // growing hash
- // values bug
- // fixed
- buildBatch, probeBatch, buildJoinColumns,
- probeSideIsEmpty.booleanValue(), allocator.getLimit(), numPartitions,
- RECORDS_PER_BATCH, RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
- batchMemoryManager.getOutputBatchSize(),
- HashTable.DEFAULT_LOAD_FACTOR);
-
- if (spilledState.isFirstCycle() && doMemoryCalculation) {
- // Do auto tuning
- buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
- }
- }
-
- if (spilledState.isFirstCycle()) {
- // Do initial setup only on the first cycle
- delayedSetup();
- }
-
- initializeBuild();
-
- initializeRuntimeFilter();
-
- // Make the calculator aware of our partitions
- HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(
- partitions);
- buildCalc.setPartitionStatSet(partitionStatSet);
-
- boolean moreData = true;
- while (moreData) {
- switch (rightUpstream) {
- case NONE:
- case NOT_YET:
- moreData = false;
- continue;
-
- case OK_NEW_SCHEMA:
- if (!buildSchema.equals(buildBatch.getSchema())) {
- throw SchemaChangeException.schemaChanged(
- "Hash join does not support schema changes in build side.",
- buildSchema, buildBatch.getSchema());
- }
- for (HashPartition partn : partitions) {
- partn.updateBatches();
- }
- // Fall through
- case OK:
- batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
- int currentRecordCount = buildBatch.getRecordCount();
- // create runtime filter
- if (spilledState.isFirstCycle() && enableRuntimeFilter) {
- // create runtime filter and send out async
- for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
- int fieldId = bloomFilter2buildId.get(bloomFilter);
- for (int ind = 0; ind < currentRecordCount; ind++) {
- long hashCode = hash64.hash64Code(ind, 0, fieldId);
- bloomFilter.insert(hashCode);
- }
- }
- }
- // Special treatment (when no spill, and single partition) -- use the
- // incoming vectors as they are (no row copy)
- if (numPartitions == 1) {
- partitions[0].appendBatch(buildBatch);
- break;
- }
-
- if (!spilledState.isFirstCycle()) {
- read_right_HV_vector = (IntVector) buildBatch.getContainer()
- .getLast();
- }
-
- // For every record in the build batch, hash the key columns and keep
- // the result
- for (int ind = 0; ind < currentRecordCount; ind++) {
- int hashCode = spilledState.isFirstCycle()
- ? partitions[0].getBuildHashCode(ind)
- : read_right_HV_vector.getAccessor().get(ind); // get the hash
- // value from the
- // HV column
- int currPart = hashCode & spilledState.getPartitionMask();
- hashCode >>>= spilledState.getBitsInMask();
- // semi-join skips join-key-duplicate rows
- if (semiJoin) {
-
- }
- // Append the new inner row to the appropriate partition; spill (that
- // partition) if needed
- partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
- hashCode, buildCalc);
- }
-
- if (read_right_HV_vector != null) {
- read_right_HV_vector.clear();
- read_right_HV_vector = null;
- }
- break;
- default:
- throw new IllegalStateException(rightUpstream.name());
- }
- // Get the next incoming record batch
- rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
- }
-
- if (spilledState.isFirstCycle() && enableRuntimeFilter) {
- if (bloomFilter2buildId.size() > 0) {
- int hashJoinOpId = this.popConfig.getOperatorId();
- runtimeFilterReporter.sendOut(bloomFilters, probeFields,
- this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
- }
- }
-
- // Move the remaining current batches into their temp lists, or spill
- // them if the partition is spilled. Add the spilled partitions into
- // the spilled partitions list
- if (numPartitions > 1) { // a single partition needs no completion
- for (HashPartition partn : partitions) {
- partn.completeAnInnerBatch(false, partn.isSpilled());
- }
- }
-
- prefetchFirstProbeBatch();
-
- if (leftUpstream.isError()) {
- // A termination condition was reached while prefetching the first build
- // side data holding batch.
- // We need to terminate.
- return leftUpstream;
- }
-
- HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc
- .next();
- postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
-
- // Traverse all the in-memory partitions' incoming batches, and build their
- // hash tables
-
- for (int index = 0; index < partitions.length; index++) {
- HashPartition partn = partitions[index];
-
- if (partn.isSpilled()) {
- // Don't build hash tables for spilled partitions
- continue;
- }
-
- try {
- if (postBuildCalc.shouldSpill()) {
- // Spill this partition if we need to make room
- partn.spillThisPartition();
- } else {
- // Only build hash tables for partitions that are not spilled
- partn.buildContainersHashTableAndHelper();
- }
- } catch (OutOfMemoryException e) {
- String message = "Failed building hash table on partition " + index
- + ":\n" + makeDebugString() + "\n"
- + postBuildCalc.makeDebugString();
- // Include debug info
- throw new OutOfMemoryException(message, e);
- }
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug(postBuildCalc.makeDebugString());
- }
-
- for (HashPartition partn : partitions) {
- if (partn.isSpilled()) {
- HashJoinSpilledPartition sp = new HashJoinSpilledPartition(
- spilledState.getCycle(), partn.getPartitionNum(), originalPartition,
- partn.getPartitionBatchesCount(), partn.getSpillFile());
-
- spilledState.addPartition(sp);
- spilledInners[partn.getPartitionNum()] = sp; // for the outer to find
- // the SP later
- partn.closeWriter();
-
- partn.updateProbeRecordsPerBatch(
- postBuildCalc.getProbeRecordsPerBatch());
- }
- }
-
- return null;
- }
-
- private void setupOutputContainerSchema() {
-
- if (buildSchema != null && !semiJoin) {
- for (MaterializedField field : buildSchema) {
- MajorType inputType = field.getType();
- MajorType outputType;
- // If left or full outer join, then the output type must be nullable.
- // However, map types are
- // not nullable so we must exclude them from the check below (see
- // DRILL-2197).
- if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
- && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
- outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
- } else {
- outputType = inputType;
- }
-
- // make sure to project field with children for children to show up in
- // the schema
- MaterializedField projected = field.withType(outputType);
- // Add the vector to our output container
- container.addOrGet(projected);
- }
- }
-
- if (probeSchema != null) { // a probe schema was seen (even though the probe
- // may had no rows)
- for (VectorWrapper<?> vv : probeBatch) {
- MajorType inputType = vv.getField().getType();
- MajorType outputType;
-
- // If right or full outer join then the output type should be optional.
- // However, map types are
- // not nullable so we must exclude them from the check below (see
- // DRILL-2771, DRILL-2197).
- if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
- && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
- outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
- } else {
- outputType = inputType;
- }
-
- ValueVector v = container.addOrGet(
- MaterializedField.create(vv.getField().getName(), outputType));
- if (v instanceof AbstractContainerVector) {
- vv.getValueVector().makeTransferPair(v);
- v.clear();
- }
- }
- }
-
+ @Override
+ public void dump() {
+ logger.error(
+ "HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={},"
+ + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]",
+ container, left, right, leftUpstream, rightUpstream, joinType,
+ probe, rightExpr, canSpill, buildSchema, probeSchema);
}
- // (After the inner side was read whole) - Has that inner partition spilled
- public boolean isSpilledInner(int part) {
- if (spilledInners == null) {
- return false;
- } // empty inner
- return spilledInners[part] != null;
+ @Override // implement RowKeyJoin interface
+ public boolean hasRowKeyBatch() {
+ return buildComplete;
}
- /**
- * The constructor
- *
- * @param popConfig
- * @param context
- * @param left
- * -- probe/outer side incoming input
- * @param right
- * -- build/iner side incoming input
- * @throws OutOfMemoryException
- */
- public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
- RecordBatch left, /* Probe side record batch */
- RecordBatch right /* Build side record batch */
- ) throws OutOfMemoryException {
- super(popConfig, context, true, left, right);
- this.buildBatch = right;
- this.probeBatch = left;
- joinType = popConfig.getJoinType();
- semiJoin = popConfig.isSemiJoin();
- joinIsLeftOrFull = joinType == JoinRelType.LEFT
- || joinType == JoinRelType.FULL;
- joinIsRightOrFull = joinType == JoinRelType.RIGHT
- || joinType == JoinRelType.FULL;
- conditions = popConfig.getConditions();
- this.popConfig = popConfig;
- this.isRowKeyJoin = popConfig.isRowKeyJoin();
- this.joinControl = new JoinControl(popConfig.getJoinControl());
-
- rightExpr = new ArrayList<>(conditions.size());
- buildJoinColumns = Sets.newHashSet();
- for (int i = 0; i < conditions.size(); i++) {
- SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
- }
-
- for (int i = 0; i < conditions.size(); i++) {
- SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
- PathSegment.NameSegment nameSegment = (PathSegment.NameSegment) rightPath
- .getLastSegment();
- buildJoinColumns.add(nameSegment.getPath());
- String refName = "build_side_" + i;
- rightExpr.add(new NamedExpression(conditions.get(i).getRight(),
- new FieldReference(refName)));
- }
-
- this.allocator = oContext.getAllocator();
-
- numPartitions = (int) context.getOptions()
- .getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
- if (numPartitions == 1) { //
- disableSpilling(
- "Spilling is disabled due to configuration setting of num_partitions to 1");
- }
-
- numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not
- // a power of 2
-
- long memLimit = context.getOptions()
- .getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
-
- if (memLimit != 0) {
- allocator.setLimit(memLimit);
- }
-
- RECORDS_PER_BATCH = (int) context.getOptions()
- .getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
- maxBatchesInMemory = (int) context.getOptions()
- .getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
-
- logger.info("Memory limit {} bytes",
- FileUtils.byteCountToDisplaySize(allocator.getLimit()));
- spillSet = new SpillSet(context, popConfig);
-
- // Create empty partitions (in the ctor - covers the case where right side
- // is empty)
- partitions = new HashPartition[0];
-
- // get the output batch size from config.
- int configuredBatchSize = (int) context.getOptions()
- .getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
- double avail_mem_factor = context.getOptions()
- .getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
- int outputBatchSize = Math.min(configuredBatchSize,
- Integer.highestOneBit((int) (allocator.getLimit() * avail_mem_factor)));
-
- RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
- "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
- configuredBatchSize, allocator.getLimit(), avail_mem_factor,
- outputBatchSize);
-
- batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left,
- right, new HashSet<>());
-
- RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
- configuredBatchSize);
-
- enableRuntimeFilter = context.getOptions()
- .getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER)
- && popConfig.getRuntimeFilterDef() != null;
+ @Override // implement RowKeyJoin interface
+ public BatchState getBatchState() {
+ return state;
}
- /**
- * This method is called when {@link HashJoinBatch} closes. It cleans up left
- * over spilled files that are in the spill queue, and closes the spillSet.
- */
- private void cleanup() {
- if (buildSideIsEmpty.booleanValue()) {
- return;
- } // not set up; nothing to clean
- if (spillSet.getWriteBytes() > 0) {
- stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
- (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
- }
- // clean (and deallocate) each partition, and delete its spill file
- for (HashPartition partn : partitions) {
- partn.close();
- }
-
- // delete any spill file left in unread spilled partitions
- while (!spilledState.isEmpty()) {
- HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
- try {
- spillSet.delete(sp.innerSpillFile);
- } catch (IOException e) {
- logger.warn("Cleanup: Failed to delete spill file {}",
- sp.innerSpillFile);
- }
- try { // outer file is added later; may be null if cleaning prematurely
- if (sp.outerSpillFile != null) {
- spillSet.delete(sp.outerSpillFile);
- }
- } catch (IOException e) {
- logger.warn("Cleanup: Failed to delete spill file {}",
- sp.outerSpillFile);
- }
- }
- // Delete the currently handled (if any) spilled files
- spillSet.close(); // delete the spill directory(ies)
+ @Override // implement RowKeyJoin interface
+ public void setBatchState(BatchState newState) {
+ state = newState;
}
- /**
- * This creates a string that summarizes the memory usage of the operator.
- *
- * @return A memory dump string.
- */
- public String makeDebugString() {
- StringBuilder sb = new StringBuilder();
-
- for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
- String partitionPrefix = "Partition " + partitionIndex + ": ";
- HashPartition hashPartition = partitions[partitionIndex];
- sb.append(partitionPrefix).append(hashPartition.makeDebugString())
- .append("\n");
- }
-
- return sb.toString();
+ @Override
+ public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) {
+ this.rkJoinState = newState;
}
- /**
- * Updates the {@link HashTable} and spilling stats after the original build
- * side is processed.
- *
- * Note: this does not update all the stats. The cycleNum is updated
- * dynamically in {@link #innerNext()} and the total bytes written is updated
- * at close time in {@link #cleanup()}.
- */
- private void updateStats() {
- if (buildSideIsEmpty.booleanValue()) {
- return;
- } // no stats when the right side is empty
- if (!spilledState.isFirstCycle()) {
- return;
- } // These stats are only for before processing spilled files
-
- HashTableStats htStats = new HashTableStats();
- long numSpilled = 0;
- HashTableStats newStats = new HashTableStats();
- // sum the stats from all the partitions
- for (HashPartition partn : partitions) {
- if (partn.isSpilled()) {
- numSpilled++;
- }
- partn.getStats(newStats);
- htStats.addStats(newStats);
- }
-
- stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
- stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
- stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
- stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
- stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
- stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in
- // case no
- // spill
- stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+ @Override
+ public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() {
+ return rkJoinState;
}
/**
@@ -1508,10 +202,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
return Pair.of(vv, pp.getRight());
}
} else if (partitions == null && firstOutputBatch) { // if there is data
- // coming to
- // right(build) side in
- // build Schema stage,
- // use it.
+ // coming to
+ // right(build) side in
+ // build Schema stage,
+ // use it.
firstOutputBatch = false;
if (right.getRecordCount() > 0) {
VectorWrapper<?> vw = Iterables.get(right, 0);
@@ -1521,124 +215,4 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
}
return null;
}
-
- @Override // implement RowKeyJoin interface
- public boolean hasRowKeyBatch() {
- return buildComplete;
- }
-
- @Override // implement RowKeyJoin interface
- public BatchState getBatchState() {
- return state;
- }
-
- @Override // implement RowKeyJoin interface
- public void setBatchState(BatchState newState) {
- state = newState;
- }
-
- @Override
- protected void cancelIncoming() {
- wasKilled = true;
- probeBatch.cancel();
- buildBatch.cancel();
- }
-
- public void updateMetrics() {
- stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
- batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
- batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
- batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
- batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
-
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
- batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
- batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
- batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
- batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
-
- stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
- batchMemoryManager.getNumOutgoingBatches());
- stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
- batchMemoryManager.getAvgOutputBatchSize());
- stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
- batchMemoryManager.getAvgOutputRowWidth());
- stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
- batchMemoryManager.getTotalOutputRecords());
- }
-
- @Override
- public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) {
- this.rkJoinState = newState;
- }
-
- @Override
- public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() {
- return rkJoinState;
- }
-
- @Override
- public void close() {
- if (!spilledState.isFirstCycle()) { // spilling happened
- // In case closing due to cancellation, BaseRootExec.close() does not
- // close the open
- // SpilledRecordBatch "scanners" as it only knows about the original
- // left/right ops.
- // TODO: Code that was here didn't actually close the "scanners"
- }
-
- updateMetrics();
-
- RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
- "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
- batchMemoryManager
- .getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager
- .getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager
- .getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager
- .getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
- "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
- batchMemoryManager
- .getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager
- .getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager
- .getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager
- .getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
- "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
- batchMemoryManager.getNumOutgoingBatches(),
- batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(),
- batchMemoryManager.getTotalOutputRecords());
-
- cleanup();
- super.close();
- }
-
- public HashJoinProbe setupHashJoinProbe() {
- // No real code generation !!
- return new HashJoinProbeTemplate();
- }
-
- @Override
- public void dump() {
- logger.error(
- "HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={},"
- + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]",
- container, left, right, leftUpstream, rightUpstream, joinType,
- hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 79956a4df9..43e221a246 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -17,441 +17,105 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import java.util.ArrayList;
-
-import com.carrotsearch.hppc.IntArrayList;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.common.HashPartition;
import org.apache.drill.exec.planner.common.JoinControl;
-import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-
-public class HashJoinProbeTemplate implements HashJoinProbe {
-
- VectorContainer container; // the outgoing container
-
- // Probe side record batch
- private RecordBatch probeBatch;
-
- private BatchSchema probeSchema;
+public class HashJoinProbeTemplate extends ProbeTemplate<HashJoinPOP> {
// Join type, INNER, LEFT, RIGHT or OUTER
private JoinRelType joinType;
-
// joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL
private JoinControl joinControl;
-
- private HashJoinBatch outgoingJoinBatch;
-
- // Number of records to process on the probe side
- private int recordsToProcess;
-
- // Number of records processed on the probe side
- private int recordsProcessed;
-
- // Number of records in the output container
- private int outputRecords;
-
- // Indicate if we should drain the next record from the probe side
- private boolean getNextRecord = true;
-
- // Contains both batch idx and record idx of the matching record in the build side
- private int currentCompositeIdx = -1;
-
- // Current state the hash join algorithm is in
- private ProbeState probeState = ProbeState.PROBE_PROJECT;
-
- // For outer or right joins, this is a list of unmatched records that needs to be projected
- private IntArrayList unmatchedBuildIndexes;
-
- private HashPartition partitions[];
-
- // While probing duplicates, retain current build-side partition in case need to continue
- // probing later on the same chain of duplicates
- private HashPartition currPartition;
-
- private int currRightPartition; // for returning RIGHT/FULL
- IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
- private int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
- private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
- private boolean buildSideIsEmpty = true;
- private int numPartitions = 1; // must be 2 to the power of bitsInMask
- private int partitionMask; // numPartitions - 1
- private int bitsInMask; // number of bits in the MASK
- private int numberOfBuildSideColumns;
- private int targetOutputRecords;
private boolean semiJoin;
- @Override
- public void setTargetOutputCount(int targetOutputRecords) {
- this.targetOutputRecords = targetOutputRecords;
- }
-
- @Override
- public int getOutputCount() {
- return outputRecords;
- }
-
/**
* Setup the Hash Join Probe object
- *
- * @param probeBatch
- * @param outgoing
- * @param joinRelType
- * @param semiJoin
- * @param leftStartState
- * @param partitions
- * @param cycleNum
- * @param container
- * @param spilledInners
- * @param buildSideIsEmpty
- * @param numPartitions
- * @param rightHVColPosition
*/
@Override
- public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
+ public void setup(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
- VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
- boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
- this.container = container;
- this.spilledInners = spilledInners;
- this.probeBatch = probeBatch;
- this.probeSchema = probeBatch.getSchema();
+ VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+ boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) throws SchemaChangeException {
+ super.setup(probeBatch, leftStartState, partitions, cycleNum, container, spilledInners,
+ buildSideIsEmpty, numPartitions);
+ this.outgoingBatch = outgoing;
this.joinType = joinRelType;
- this.outgoingJoinBatch = outgoing;
- this.partitions = partitions;
- this.cycleNum = cycleNum;
- this.buildSideIsEmpty = buildSideIsEmpty;
- this.numPartitions = numPartitions;
- this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; // position (0 based) of added column == #columns
+ this.joinControl = new JoinControl(outgoing.getPopConfig().getJoinControl());
this.semiJoin = semiJoin;
-
- partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
- bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
- joinControl = new JoinControl(outgoingJoinBatch.getPopConfig().getJoinControl());
-
- probeState = ProbeState.PROBE_PROJECT;
- this.recordsToProcess = 0;
- this.recordsProcessed = 0;
-
- // A special case - if the left was an empty file
- if (leftStartState == IterOutcome.NONE){
- changeToFinalProbeState();
- } else {
- this.recordsToProcess = probeBatch.getRecordCount();
- }
-
- // for those outer partitions that need spilling (cause their matching inners spilled)
- // initialize those partitions' current batches and hash-value vectors
- for (HashPartition partn : this.partitions) {
- partn.allocateNewCurrentBatchAndHV();
- }
-
- currRightPartition = 0; // In case it's a Right/Full outer join
-
- // Initialize the HV vector for the first (already read) left batch
- if (this.cycleNum > 0) {
- if (read_left_HV_vector != null) { read_left_HV_vector.clear();}
- if (leftStartState != IterOutcome.NONE) { // Skip when outer spill was empty
- read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast();
- }
- }
- }
-
- /**
- * Append the given build side row into the outgoing container
- * @param buildSrcContainer The container for the right/inner side
- * @param buildSrcIndex build side index
- */
- private void appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) {
- for (int vectorIndex = 0; vectorIndex < numberOfBuildSideColumns; vectorIndex++) {
- ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
- ValueVector srcVector = buildSrcContainer.getValueVector(vectorIndex).getValueVector();
- destVector.copyEntry(container.getRecordCount(), srcVector, buildSrcIndex);
- }
- }
-
- /**
- * Append the given probe side row into the outgoing container, following the build side part
- * @param probeSrcContainer The container for the left/outer side
- * @param probeSrcIndex probe side index
- */
- private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex) {
- for (int vectorIndex = numberOfBuildSideColumns; vectorIndex < container.getNumberOfColumns(); vectorIndex++) {
- ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
- ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex - numberOfBuildSideColumns).getValueVector();
- destVector.copyEntry(container.getRecordCount(), srcVector, probeSrcIndex);
- }
- }
-
- /**
- * A special version of the VectorContainer's appendRow for the HashJoin; (following a probe) it
- * copies the build and probe sides into the outgoing container. (It uses a composite
- * index for the build side). If any of the build/probe source containers is null, then that side
- * is not appended (effectively outputing nulls for that side's columns).
- * @param buildSrcContainers The containers list for the right/inner side
- * @param compositeBuildSrcIndex Composite build index
- * @param probeSrcContainer The single container for the left/outer side
- * @param probeSrcIndex Index in the outer container
- * @return Number of rows in this container (after the append)
- */
- private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
- VectorContainer probeSrcContainer, int probeSrcIndex) {
-
- if (buildSrcContainers != null) {
- int buildBatchIndex = compositeBuildSrcIndex >>> 16;
- int buildOffset = compositeBuildSrcIndex & 65535;
- appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset);
- }
- if (probeSrcContainer != null) {
- appendProbe(probeSrcContainer, probeSrcIndex);
- }
- return container.incRecordCount();
+ this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; // position (0 based) of added column == #columns
}
- /**
- * After the "inner" probe phase, finish up a Right (of Full) Join by projecting the unmatched rows of the build side
- * @param currBuildPart Which partition
- */
- private void executeProjectRightPhase(int currBuildPart) {
- while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) {
- outputRecords =
- outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
- null /* no probeBatch */, 0 /* no probe index */);
+ @Override
+ protected void handleProbeResult(int probeIndex) {
+ if (semiJoin) {
+ if (probeIndex != -1) {
+ // output the probe side only
+ outputRecords =
+ outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+ }
recordsProcessed++;
+ return; // no build-side duplicates, go on to the next probe-side row
}
- }
-
- private void executeProbePhase() throws SchemaChangeException {
-
- while (outputRecords < targetOutputRecords && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
-
- // Check if we have processed all records in this batch we need to invoke next
- if (recordsProcessed == recordsToProcess) {
-
- // Done processing all records in the previous batch, clean up!
- for (VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
-
- IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
- switch (leftUpstream) {
- case NONE:
- case NOT_YET:
- recordsProcessed = 0;
- recordsToProcess = 0;
- changeToFinalProbeState();
- // in case some outer partitions were spilled, need to spill their last batches
- for (HashPartition partn : partitions) {
- if (! partn.isSpilled()) { continue; } // skip non-spilled
- partn.completeAnOuterBatch(false);
- // update the partition's spill record with the outer side
- HashJoinBatch.HashJoinSpilledPartition sp = spilledInners[partn.getPartitionNum()];
- sp.updateOuter(partn.getPartitionBatchesCount(), partn.getSpillFile());
- partn.closeWriter();
- }
+ if (probeIndex != -1) {
+ /* The current probe record has a key that matches. Get the index
+ * of the first row in the build side that matches the current key
+ * (and record this match in the bitmap, in case of a FULL/RIGHT join)
+ */
+ Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex);
- continue;
+ boolean matchExists = matchStatus.getRight();
- case OK_NEW_SCHEMA:
- if (probeBatch.getSchema().equals(probeSchema)) {
- for (HashPartition partn : partitions) { partn.updateBatches(); }
-
- } else {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
- probeSchema,
- probeBatch.getSchema());
- }
- case OK:
- outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
- setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
- recordsToProcess = probeBatch.getRecordCount();
- recordsProcessed = 0;
- // If we received an empty batch do nothing
- if (recordsToProcess == 0) {
- continue;
- }
- if (cycleNum > 0) {
- read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
- }
- break;
- default:
- }
+ if (joinControl.isIntersectDistinct() && matchExists) {
+ // since it is intersect distinct and we already have one record matched, move to next probe row
+ recordsProcessed++;
+ return;
}
- int probeIndex = -1;
- // Check if we need to drain the next row in the probe side
- if (getNextRecord) {
- if (!buildSideIsEmpty) {
- int hashCode = (cycleNum == 0) ?
- partitions[0].getProbeHashCode(recordsProcessed)
- : read_left_HV_vector.getAccessor().get(recordsProcessed);
- int currBuildPart = hashCode & partitionMask;
- hashCode >>>= bitsInMask;
-
- // Set and keep the current partition (may be used again on subsequent probe calls as
- // inner rows of duplicate key are processed)
- currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
-
- // If the matching inner partition was spilled
- if (outgoingJoinBatch.isSpilledInner(currBuildPart)) {
- // add this row to its outer partition (may cause a spill, when the batch is full)
-
- currPartition.appendOuterRow(hashCode, recordsProcessed);
-
- recordsProcessed++; // done with this outer record
- continue; // on to the next outer record
- }
-
- probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
-
- }
-
- if (semiJoin) {
- if (probeIndex != -1) {
- // output the probe side only
- outputRecords =
- outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
- }
- recordsProcessed++;
- continue; // no build-side duplicates, go on to the next probe-side row
- }
-
- if (probeIndex != -1) {
-
- /* The current probe record has a key that matches. Get the index
- * of the first row in the build side that matches the current key
- * (and record this match in the bitmap, in case of a FULL/RIGHT join)
- */
- Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex);
-
- boolean matchExists = matchStatus.getRight();
-
- if (joinControl.isIntersectDistinct() && matchExists) {
- // since it is intersect distinct and we already have one record matched, move to next probe row
- recordsProcessed++;
- continue;
- }
-
- currentCompositeIdx = matchStatus.getLeft();
-
- outputRecords =
- outputRow(currPartition.getContainers(), currentCompositeIdx,
- probeBatch.getContainer(), recordsProcessed);
+ currentCompositeIdx = matchStatus.getLeft();
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case as long as
- * we are not doing intersect distinct since it only cares about
- * distinct values.
- */
- currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 :
- currPartition.getNextIndex(currentCompositeIdx);
-
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- } else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
- } else { // No matching key
-
- // If we have a left outer join, project the outer side
- if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-
- outputRecords = // output only the probe side (the build side would be all nulls)
- outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
- }
- recordsProcessed++;
- }
+ outputRecords =
+ outputRow(currPartition.getContainers(), currentCompositeIdx,
+ probeBatch.getContainer(), recordsProcessed);
+
+ /* Projected single row from the build side with matching key but there
+ * may be more rows with the same key. Check if that's the case as long as
+ * we are not doing intersect distinct since it only cares about
+ * distinct values.
+ */
+ currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 :
+ currPartition.getNextIndex(currentCompositeIdx);
+
+ if (currentCompositeIdx == -1) {
+ /* We only had one row in the build side that matched the current key
+ * from the probe side. Drain the next row in the probe side.
+ */
+ recordsProcessed++;
+ } else {
+ /* There is more than one row with the same key on the build side
+ * don't drain more records from the probe side till we have projected
+ * all the rows with this key
+ */
+ getNextRecord = false;
}
- else { // match the next inner row with the same key
-
- currPartition.setRecordMatched(currentCompositeIdx);
-
- outputRecords =
- outputRow(currPartition.getContainers(), currentCompositeIdx,
- probeBatch.getContainer(), recordsProcessed);
+ } else { // No matching key
- currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+ // If we have a left outer join, project the outer side
+ if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
- if (currentCompositeIdx == -1) {
- // We don't have any more rows matching the current key on the build side, move on to the next probe row
- getNextRecord = true;
- recordsProcessed++;
- }
+ outputRecords = // output only the probe side (the build side would be all nulls)
+ outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
}
+ recordsProcessed++;
}
}
- /**
- * Perform the probe, till the outgoing is full, or no more rows to probe.
- * Performs the inner or left-outer join while there are left rows,
- * when done, continue with right-outer, if appropriate.
- * @return Num of output records
- * @throws SchemaChangeException
- */
- @Override
- public int probeAndProject() throws SchemaChangeException {
-
- outputRecords = 0;
-
- // When handling spilled partitions, the state becomes DONE at the end of each partition
- if (probeState == ProbeState.DONE) {
- return outputRecords; // that is zero
- }
-
- if (probeState == ProbeState.PROBE_PROJECT) {
- executeProbePhase();
- }
-
- if (probeState == ProbeState.PROJECT_RIGHT) {
- // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
-
- do {
-
- if (unmatchedBuildIndexes == null) { // first time for this partition ?
- if (buildSideIsEmpty) { return outputRecords; } // in case of an empty right
- // Get this partition's list of build indexes that didn't match any record on the probe side
- unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
- recordsProcessed = 0;
- recordsToProcess = unmatchedBuildIndexes.size();
- }
-
- // Project the list of unmatched records on the build side
- executeProjectRightPhase(currRightPartition);
-
- if (recordsProcessed < recordsToProcess) { // more records in this partition?
- return outputRecords; // outgoing is full; report and come back later
- } else {
- currRightPartition++; // on to the next right partition
- unmatchedBuildIndexes = null;
- }
-
- } while (currRightPartition < numPartitions);
-
- probeState = ProbeState.DONE; // last right partition was handled; we are done now
- }
-
- return outputRecords;
- }
-
@Override
public void changeToFinalProbeState() {
// We are done with the (left) probe phase.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/Probe.java
similarity index 64%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/Probe.java
index 490eba4e33..b5acb89177 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/Probe.java
@@ -17,16 +17,15 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SqlKind;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.setop.HashSetOpRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.exec.record.VectorContainer;
-public interface HashJoinProbe {
- TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
-
+public interface Probe {
/* The probe side of the hash join can be in the following two states
* 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
* key match and if we do we project the record
@@ -39,11 +38,21 @@ public interface HashJoinProbe {
PROBE_PROJECT, PROJECT_RIGHT, DONE
}
- void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
- RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
- VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
- boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition);
- int probeAndProject() throws SchemaChangeException;
+ default void setup(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
+ RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+ VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+ boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) throws SchemaChangeException {
+ throw new UnsupportedOperationException();
+ }
+
+ default void setup(RecordBatch probeBatch, HashSetOpRecordBatch outgoing, SqlKind opType, boolean isAll,
+ RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+ VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+ boolean buildSideIsEmpty, int numPartitions) throws SchemaChangeException {
+ throw new UnsupportedOperationException();
+ }
+
+ int probeAndProject() throws SchemaChangeException;
void changeToFinalProbeState();
void setTargetOutputCount(int targetOutputCount);
int getOutputCount();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/ProbeTemplate.java
similarity index 63%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/ProbeTemplate.java
index 79956a4df9..21101b5b51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/ProbeTemplate.java
@@ -17,127 +17,84 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import java.util.ArrayList;
-
import com.carrotsearch.hppc.IntArrayList;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.common.HashPartition;
-import org.apache.drill.exec.planner.common.JoinControl;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
+import java.util.ArrayList;
+
import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-public class HashJoinProbeTemplate implements HashJoinProbe {
+public abstract class ProbeTemplate<T extends PhysicalOperator> implements Probe {
- VectorContainer container; // the outgoing container
+ protected VectorContainer container; // the outgoing container
// Probe side record batch
- private RecordBatch probeBatch;
+ protected RecordBatch probeBatch;
- private BatchSchema probeSchema;
-
- // Join type, INNER, LEFT, RIGHT or OUTER
- private JoinRelType joinType;
-
- // joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL
- private JoinControl joinControl;
-
- private HashJoinBatch outgoingJoinBatch;
+ protected BatchSchema probeSchema;
// Number of records to process on the probe side
- private int recordsToProcess;
+ protected int recordsToProcess;
// Number of records processed on the probe side
- private int recordsProcessed;
+ protected int recordsProcessed;
// Number of records in the output container
- private int outputRecords;
+ protected int outputRecords;
// Indicate if we should drain the next record from the probe side
- private boolean getNextRecord = true;
+ protected boolean getNextRecord = true;
// Contains both batch idx and record idx of the matching record in the build side
- private int currentCompositeIdx = -1;
+ protected int currentCompositeIdx = -1;
// Current state the hash join algorithm is in
- private ProbeState probeState = ProbeState.PROBE_PROJECT;
+ protected ProbeState probeState = ProbeState.PROBE_PROJECT;
// For outer or right joins, this is a list of unmatched records that needs to be projected
- private IntArrayList unmatchedBuildIndexes;
+ protected IntArrayList unmatchedBuildIndexes;
- private HashPartition partitions[];
+ protected HashPartition[] partitions;
// While probing duplicates, retain current build-side partition in case need to continue
// probing later on the same chain of duplicates
- private HashPartition currPartition;
-
- private int currRightPartition; // for returning RIGHT/FULL
+ protected HashPartition currPartition;
+ protected int currRightPartition; // for returning RIGHT/FULL
IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
- private int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
- private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
- private boolean buildSideIsEmpty = true;
- private int numPartitions = 1; // must be 2 to the power of bitsInMask
- private int partitionMask; // numPartitions - 1
- private int bitsInMask; // number of bits in the MASK
- private int numberOfBuildSideColumns;
- private int targetOutputRecords;
- private boolean semiJoin;
-
- @Override
- public void setTargetOutputCount(int targetOutputRecords) {
- this.targetOutputRecords = targetOutputRecords;
- }
-
- @Override
- public int getOutputCount() {
- return outputRecords;
- }
-
- /**
- * Setup the Hash Join Probe object
- *
- * @param probeBatch
- * @param outgoing
- * @param joinRelType
- * @param semiJoin
- * @param leftStartState
- * @param partitions
- * @param cycleNum
- * @param container
- * @param spilledInners
- * @param buildSideIsEmpty
- * @param numPartitions
- * @param rightHVColPosition
- */
- @Override
- public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
- IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
- VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
- boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
+ protected int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
+ protected AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners; // for the outer to find the partition
+ protected boolean buildSideIsEmpty = true;
+ protected int numPartitions = 1; // must be 2 to the power of bitsInMask
+ protected int partitionMask; // numPartitions - 1
+ protected int bitsInMask; // number of bits in the MASK
+ protected int numberOfBuildSideColumns;
+ protected int targetOutputRecords;
+ protected AbstractHashBinaryRecordBatch<T> outgoingBatch;
+
+ protected void setup(RecordBatch probeBatch, IterOutcome leftStartState,
+ HashPartition[] partitions, int cycleNum,
+ VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+ boolean buildSideIsEmpty, int numPartitions) throws SchemaChangeException {
this.container = container;
this.spilledInners = spilledInners;
this.probeBatch = probeBatch;
this.probeSchema = probeBatch.getSchema();
- this.joinType = joinRelType;
- this.outgoingJoinBatch = outgoing;
this.partitions = partitions;
this.cycleNum = cycleNum;
this.buildSideIsEmpty = buildSideIsEmpty;
this.numPartitions = numPartitions;
- this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; // position (0 based) of added column == #columns
- this.semiJoin = semiJoin;
partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
- joinControl = new JoinControl(outgoingJoinBatch.getPopConfig().getJoinControl());
probeState = ProbeState.PROBE_PROJECT;
this.recordsToProcess = 0;
@@ -156,6 +113,13 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
partn.allocateNewCurrentBatchAndHV();
}
+ // Container is cleared after prefetchFirstProbeBatch in case of AggRecordBatch,
+ // only the partition with batch in it will update the reference to vv in the hashtable after that.
+ // Update the first partition here if not yet, it will be used when get hash code.
+ if (this.cycleNum == 0 && partitions.length > 0 && partitions[0].getPartitionBatchesCount() == 0) {
+ partitions[0].updateBatches();
+ }
+
currRightPartition = 0; // In case it's a Right/Full outer join
// Initialize the HV vector for the first (already read) left batch
@@ -167,6 +131,16 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
}
}
+ @Override
+ public void setTargetOutputCount(int targetOutputRecords) {
+ this.targetOutputRecords = targetOutputRecords;
+ }
+
+ @Override
+ public int getOutputCount() {
+ return outputRecords;
+ }
+
/**
* Append the given build side row into the outgoing container
* @param buildSrcContainer The container for the right/inner side
@@ -204,7 +178,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
* @param probeSrcIndex Index in the outer container
* @return Number of rows in this container (after the append)
*/
- private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
+ protected int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
VectorContainer probeSrcContainer, int probeSrcIndex) {
if (buildSrcContainers != null) {
@@ -222,7 +196,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
* After the "inner" probe phase, finish up a Right (of Full) Join by projecting the unmatched rows of the build side
* @param currBuildPart Which partition
*/
- private void executeProjectRightPhase(int currBuildPart) {
+ protected void executeProjectRightPhase(int currBuildPart) {
while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) {
outputRecords =
outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
@@ -243,7 +217,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
wrapper.getValueVector().clear();
}
- IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
+ IterOutcome leftUpstream = outgoingBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
switch (leftUpstream) {
case NONE:
@@ -256,7 +230,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
if (! partn.isSpilled()) { continue; } // skip non-spilled
partn.completeAnOuterBatch(false);
// update the partition's spill record with the outer side
- HashJoinBatch.HashJoinSpilledPartition sp = spilledInners[partn.getPartitionNum()];
+ AbstractHashBinaryRecordBatch.SpilledPartition sp = spilledInners[partn.getPartitionNum()];
sp.updateOuter(partn.getPartitionBatchesCount(), partn.getSpillFile());
partn.closeWriter();
@@ -269,13 +243,13 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
for (HashPartition partn : partitions) { partn.updateBatches(); }
} else {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
- probeSchema,
- probeBatch.getSchema());
+ throw SchemaChangeException.schemaChanged(
+ this.getClass().getSimpleName() + " does not support schema changes in probe side.",
+ probeSchema, probeBatch.getSchema());
}
case OK:
- outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
- setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
+ outgoingBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
+ setTargetOutputCount(outgoingBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
recordsToProcess = probeBatch.getRecordCount();
recordsProcessed = 0;
// If we received an empty batch do nothing
@@ -305,7 +279,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
// If the matching inner partition was spilled
- if (outgoingJoinBatch.isSpilledInner(currBuildPart)) {
+ if (outgoingBatch.isSpilledInner(currBuildPart)) {
// add this row to its outer partition (may cause a spill, when the batch is full)
currPartition.appendOuterRow(hashCode, recordsProcessed);
@@ -318,70 +292,8 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
}
- if (semiJoin) {
- if (probeIndex != -1) {
- // output the probe side only
- outputRecords =
- outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
- }
- recordsProcessed++;
- continue; // no build-side duplicates, go on to the next probe-side row
- }
-
- if (probeIndex != -1) {
-
- /* The current probe record has a key that matches. Get the index
- * of the first row in the build side that matches the current key
- * (and record this match in the bitmap, in case of a FULL/RIGHT join)
- */
- Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex);
-
- boolean matchExists = matchStatus.getRight();
-
- if (joinControl.isIntersectDistinct() && matchExists) {
- // since it is intersect distinct and we already have one record matched, move to next probe row
- recordsProcessed++;
- continue;
- }
-
- currentCompositeIdx = matchStatus.getLeft();
-
- outputRecords =
- outputRow(currPartition.getContainers(), currentCompositeIdx,
- probeBatch.getContainer(), recordsProcessed);
-
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case as long as
- * we are not doing intersect distinct since it only cares about
- * distinct values.
- */
- currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 :
- currPartition.getNextIndex(currentCompositeIdx);
-
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- } else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
- } else { // No matching key
-
- // If we have a left outer join, project the outer side
- if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-
- outputRecords = // output only the probe side (the build side would be all nulls)
- outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
- }
- recordsProcessed++;
- }
- }
- else { // match the next inner row with the same key
+ handleProbeResult(probeIndex);
+ } else { // match the next inner row with the same key
currPartition.setRecordMatched(currentCompositeIdx);
@@ -405,7 +317,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
* Performs the inner or left-outer join while there are left rows,
* when done, continue with right-outer, if appropriate.
* @return Num of output records
- * @throws SchemaChangeException
+ * @throws SchemaChangeException SchemaChangeException
*/
@Override
public int probeAndProject() throws SchemaChangeException {
@@ -423,9 +335,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
if (probeState == ProbeState.PROJECT_RIGHT) {
// Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
-
do {
-
if (unmatchedBuildIndexes == null) { // first time for this partition ?
if (buildSideIsEmpty) { return outputRecords; } // in case of an empty right
// Get this partition's list of build indexes that didn't match any record on the probe side
@@ -444,33 +354,12 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
unmatchedBuildIndexes = null;
}
- } while (currRightPartition < numPartitions);
+ } while (currRightPartition < numPartitions);
probeState = ProbeState.DONE; // last right partition was handled; we are done now
}
-
return outputRecords;
}
- @Override
- public void changeToFinalProbeState() {
- // We are done with the (left) probe phase.
- // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side
- probeState =
- (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
- ProbeState.DONE; // else we're done
- }
-
- @Override
- public String toString() {
- return "HashJoinProbeTemplate[container=" + container
- + ", probeSchema=" + probeSchema
- + ", joinType=" + joinType
- + ", recordsToProcess=" + recordsToProcess
- + ", recordsProcessed=" + recordsProcessed
- + ", outputRecords=" + outputRecords
- + ", probeState=" + probeState
- + ", unmatchedBuildIndexes=" + unmatchedBuildIndexes
- + "]";
- }
+ protected abstract void handleProbeResult(int probeIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpProbeTemplate.java
new file mode 100644
index 0000000000..19e9c0daed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpProbeTemplate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.setop;
+
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.config.SetOp;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.join.AbstractHashBinaryRecordBatch;
+import org.apache.drill.exec.physical.impl.join.ProbeTemplate;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+
+public class HashSetOpProbeTemplate extends ProbeTemplate<SetOp> {
+ private SqlKind opType;
+ private boolean isAll;
+
+ /**
+ * Setup the Hash Set Op Probe object
+ */
+ @Override
+ public void setup(RecordBatch probeBatch, HashSetOpRecordBatch outgoing, SqlKind opType, boolean isAll,
+ IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+ VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+ boolean buildSideIsEmpty, int numPartitions) throws SchemaChangeException {
+ super.setup(probeBatch, leftStartState, partitions, cycleNum, container, spilledInners,
+ buildSideIsEmpty, numPartitions);
+ this.outgoingBatch = outgoing;
+ this.opType = opType;
+ this.isAll = isAll;
+ this.numberOfBuildSideColumns = 0;
+ }
+
+ @Override
+ protected void handleProbeResult(int probeIndex) {
+ switch (opType) {
+ case INTERSECT:
+ if (probeIndex != -1 && currPartition.getRecordNumForKey(probeIndex) > 0) {
+ if (isAll) {
+ currPartition.decreaseRecordNumForKey(probeIndex);
+ } else {
+ currPartition.setRecordNumForKey(probeIndex, 0);
+ }
+ outputRecords =
+ outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+ }
+ break;
+ case EXCEPT:
+ if (isAll) {
+ if (probeIndex == -1 || currPartition.getRecordNumForKey(probeIndex) == 0) {
+ outputRecords =
+ outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+ } else {
+ currPartition.decreaseRecordNumForKey(probeIndex);
+ }
+ } else if (probeIndex == -1) {
+ outputRecords =
+ outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+ }
+ default:
+ break;
+ }
+ recordsProcessed++;
+ }
+
+ @Override
+ public void changeToFinalProbeState() {
+ probeState = ProbeState.DONE;
+ }
+
+ @Override
+ public String toString() {
+ return "HashSetOpProbeTemplate[container=" + container
+ + ", probeSchema=" + probeSchema
+ + ", opType=" + opType
+ + ", isAll=" + isAll
+ + ", recordsToProcess=" + recordsToProcess
+ + ", recordsProcessed=" + recordsProcessed
+ + ", outputRecords=" + outputRecords
+ + ", probeState=" + probeState
+ + "]";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpRecordBatch.java
new file mode 100644
index 0000000000..516788d76f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpRecordBatch.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.setop;
+
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.SetOp;
+import org.apache.drill.exec.physical.impl.common.Comparator;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.join.AbstractHashBinaryRecordBatch;
+import org.apache.drill.exec.physical.impl.join.Probe;
+import org.apache.drill.exec.planner.common.JoinControl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implements the runtime execution for the Hash-SetOp operator supporting EXCEPT,
+ * EXCEPT ALL, INTERSECT, and INTERSECT ALL
+ */
+public class HashSetOpRecordBatch extends AbstractHashBinaryRecordBatch<SetOp> {
+ private final SqlKind opType;
+ private final boolean isAll;
+
+ /**
+ * The constructor
+ *
+ * @param popConfig SetOp
+ * @param context FragmentContext
+ * @param left probe/outer side incoming input
+ * @param right build/inner side incoming input
+ * @throws OutOfMemoryException out of memory exception
+ */
+ public HashSetOpRecordBatch(SetOp popConfig, FragmentContext context,
+ RecordBatch left, RecordBatch right)
+ throws OutOfMemoryException {
+ super(popConfig, context, left, right);
+ this.opType = popConfig.getKind();
+ this.isAll = popConfig.isAll();
+ this.semiJoin = true;
+ this.joinIsLeftOrFull = true;
+ this.joinIsRightOrFull = false;
+ this.isRowKeyJoin = false;
+ this.enableRuntimeFilter = false;
+ }
+
+ private void buildCompareExpression(List<NamedExpression> leftExpr, List<Comparator> comparators) {
+ Iterator<MaterializedField> iterator = probeSchema.iterator();
+ int i = 0;
+ while (iterator.hasNext()) {
+ MaterializedField field = iterator.next();
+ String refName = "probe_side_" + i;
+ leftExpr.add(new NamedExpression(new FieldReference(field.getName()), new FieldReference(refName)));
+ i++;
+ }
+
+ iterator = buildSchema.iterator();
+ i = 0;
+ while (iterator.hasNext()) {
+ MaterializedField field = iterator.next();
+ String refName = "build_side_" + i;
+ rightExpr.add(new NamedExpression(new FieldReference(field.getName()), new FieldReference(refName)));
+ buildJoinColumns.add(field.getName());
+ i++;
+ }
+ leftExpr.forEach(e->comparators.add(Comparator.EQUALS));
+ }
+
+ @Override
+ protected HashTableConfig buildHashTableConfig() {
+ if (leftUpstream == IterOutcome.OK_NEW_SCHEMA
+ || leftUpstream == IterOutcome.OK) {
+ if (probeBatch.getSchema()
+ .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+ throw UserException.internalError(null).message(
+ "Hash SetOp does not support probe batch with selection vectors.")
+ .addContext("Probe batch has selection mode",
+ (probeBatch.getSchema().getSelectionVectorMode()).toString())
+ .build(logger);
+ }
+ }
+
+ List<NamedExpression> leftExpr = Lists.newArrayListWithExpectedSize(probeSchema.getFieldCount());
+ List<Comparator> comparators = Lists.newArrayListWithExpectedSize(probeSchema.getFieldCount());
+ buildCompareExpression(leftExpr, comparators);
+ return new HashTableConfig(
+ (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+ true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
+ JoinControl.DEFAULT, true);
+ }
+
+ @Override
+ public Probe createProbe() {
+ // No real code generation !!
+ return new HashSetOpProbeTemplate();
+ }
+
+ @Override
+ public void setupProbe() throws SchemaChangeException {
+ probe.setup(probeBatch, this, opType,
+ isAll, leftUpstream, partitions, spilledState.getCycle(),
+ container, spilledInners, buildSideIsEmpty.booleanValue(),
+ numPartitions);
+ }
+
+ @Override
+ public void dump() {
+ logger.error(
+ "HashSetOpRecordBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, " +
+ "SetOpType={}, IsAll={}, hashSetOpProbe={}, canSpill={}, buildSchema={}, probeSchema={}]",
+ container, left, right, leftUpstream, rightUpstream, opType, isAll, probe, canSpill, buildSchema, probeSchema);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/SetOpBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/SetOpBatchCreator.java
new file mode 100644
index 0000000000..58338eb60e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/SetOpBatchCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.setop;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.SetOp;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+
+public class SetOpBatchCreator implements BatchCreator<SetOp> {
+ @Override
+ public HashSetOpRecordBatch getBatch(ExecutorFragmentContext context, SetOp config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 2);
+ return new HashSetOpRecordBatch(config, context, children.get(0), children.get(1));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 00ff1fc704..2c2215eacd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -19,12 +19,14 @@ package org.apache.drill.exec.planner;
import org.apache.drill.exec.planner.logical.ConvertMetadataAggregateToDirectScanRule;
import org.apache.drill.exec.planner.logical.DrillDistinctJoinToSemiJoinRule;
+import org.apache.drill.exec.planner.logical.DrillSetOpRule;
import org.apache.drill.exec.planner.logical.DrillReduceExpressionsRule;
import org.apache.drill.exec.planner.logical.DrillTableModifyRule;
import org.apache.drill.exec.planner.physical.MetadataAggPrule;
import org.apache.drill.exec.planner.physical.MetadataControllerPrule;
import org.apache.drill.exec.planner.physical.MetadataHandlerPrule;
import org.apache.drill.exec.planner.physical.TableModifyPrule;
+import org.apache.drill.exec.planner.physical.SetOpPrule;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -396,6 +398,7 @@ public enum PlannerPhase {
*/
ImmutableSet.Builder<RelOptRule> basicRules = ImmutableSet.<RelOptRule>builder()
.addAll(staticRuleSet)
+ .addAll(DrillSetOpRule.INSTANCES)
.add(
DrillMergeProjectRule.getInstance(true, RelFactories.DEFAULT_PROJECT_FACTORY,
optimizerRulesContext.getFunctionRegistry())
@@ -551,6 +554,9 @@ public enum PlannerPhase {
ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
ruleList.add(TableModifyPrule.INSTANCE);
+ ruleList.addAll(SetOpPrule.DIST_INSTANCES);
+ ruleList.addAll(SetOpPrule.BROADCAST_INSTANCES);
+
if (ps.isHashAggEnabled()) {
ruleList.add(HashAggPrule.INSTANCE);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSetOpRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSetOpRel.java
new file mode 100644
index 0000000000..29b87b291e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSetOpRel.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+public interface DrillSetOpRel {
+ default boolean isCompatible(RelDataType setOpType, List<RelNode> inputs) {
+ for (RelNode input : inputs) {
+ if (!DrillRelOptUtil.areRowTypesCompatible(
+ input.getRowType(), setOpType, false, true)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
deleted file mode 100644
index 968cf54eec..0000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.common;
-
-import java.util.List;
-
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Union;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.type.RelDataType;
-
-/**
- * Base class for logical and physical Union implemented in Drill
- */
-public abstract class DrillUnionRelBase extends Union implements DrillRelNode {
-
- public DrillUnionRelBase(RelOptCluster cluster, RelTraitSet traits,
- List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, all);
- if (checkCompatibility &&
- !this.isCompatible(false /* don't compare names */, true /* allow substrings */)) {
- throw new InvalidRelException("Input row types of the Union are not compatible.");
- }
- }
-
- public boolean isCompatible(boolean compareNames, boolean allowSubstring) {
- RelDataType unionType = getRowType();
- for (RelNode input : getInputs()) {
- if (! DrillRelOptUtil.areRowTypesCompatible(
- input.getRowType(), unionType, compareNames, allowSubstring)) {
- return false;
- }
- }
- return true;
- }
-
- public boolean isDistinct() {
- return !this.all;
- }
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillExceptRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillExceptRel.java
new file mode 100644
index 0000000000..38173bf4d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillExceptRel.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Minus implemented in Drill.
+ */
+public class DrillExceptRel extends Minus implements DrillRel, DrillSetOpRel {
+ private static final Logger tracer = CalciteTrace.getPlannerTracer();
+
+ public DrillExceptRel(RelOptCluster cluster, RelTraitSet traits,
+ List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
+ super(cluster, traits, inputs, all);
+ if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+ throw new InvalidRelException("Input row types of the Except are not compatible.");
+ }
+ }
+
+ public static DrillExceptRel create(List<RelNode> inputs, boolean all) {
+ try {
+ return new DrillExceptRel(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs, all, true);
+ } catch (InvalidRelException e) {
+ tracer.warn(e.toString());
+ return null;
+ }
+ }
+
+ @Override
+ public DrillExceptRel copy(RelTraitSet traitSet, List<RelNode> inputs,
+ boolean all) {
+ try {
+ return new DrillExceptRel(getCluster(), traitSet, inputs, all, false);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ public DrillExceptRel copy(List<RelNode> inputs) {
+ try {
+ return new DrillExceptRel(getCluster(), traitSet, inputs, all, false);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ public DrillExceptRel copy() {
+ try {
+ return new DrillExceptRel(getCluster(), traitSet, inputs, all, false);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ public static DrillExceptRel convert(Except except, ConversionContext context) throws InvalidRelException{
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ Except.Builder builder = Except.builder();
+ for (Ord<RelNode> input : Ord.zip(inputs)) {
+ builder.addInput(implementor.visitChild(this, input.i, input.e));
+ }
+ builder.setDistinct(!all);
+ return builder.build();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillIntersectRel.java
similarity index 52%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillIntersectRel.java
index a5a6e0330e..10dc50a1a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillIntersectRel.java
@@ -17,52 +17,61 @@
*/
package org.apache.drill.exec.planner.logical;
-import java.util.List;
-
import org.apache.calcite.linq4j.Ord;
-
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.Union;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
-import org.apache.drill.exec.planner.torel.ConversionContext;
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.slf4j.Logger;
+
+import java.util.List;
/**
- * Union implemented in Drill.
+ * Intersect implemented in Drill.
*/
-public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
- /** Creates a DrillUnionRel. */
- public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits,
- List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, all, checkCompatibility);
+public class DrillIntersectRel extends Intersect implements DrillRel, DrillSetOpRel {
+ private static final Logger tracer = CalciteTrace.getPlannerTracer();
+
+ public DrillIntersectRel(RelOptCluster cluster, RelTraitSet traits,
+ List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
+ super(cluster, traits, inputs, all);
+ if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+ throw new InvalidRelException("Input row types of the Intersect are not compatible.");
+ }
+ }
+
+ public static DrillIntersectRel create(List<RelNode> inputs, boolean all) {
+ try {
+ return new DrillIntersectRel(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs, all, true);
+ } catch (InvalidRelException e) {
+ tracer.warn(e.toString());
+ return null;
+ }
}
@Override
- public DrillUnionRel copy(RelTraitSet traitSet, List<RelNode> inputs,
- boolean all) {
+ public DrillIntersectRel copy(RelTraitSet traitSet, List<RelNode> inputs,
+ boolean all) {
try {
- return new DrillUnionRel(getCluster(), traitSet, inputs, all,
+ return new DrillIntersectRel(getCluster(), traitSet, inputs, all,
false /* don't check compatibility during copy */);
} catch (InvalidRelException e) {
throw new AssertionError(e);
}
}
- @Override
- public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- // divide cost by two to ensure cheaper than EnumerableDrillRel
- return super.computeSelfCost(planner, mq).multiplyBy(.5);
+ public static DrillIntersectRel convert(org.apache.drill.common.logical.data.Intersect intersect, ConversionContext context) throws InvalidRelException{
+ throw new UnsupportedOperationException();
}
@Override
public LogicalOperator implement(DrillImplementor implementor) {
- Union.Builder builder = Union.builder();
+ org.apache.drill.common.logical.data.Intersect.Builder builder = org.apache.drill.common.logical.data.Intersect.builder();
for (Ord<RelNode> input : Ord.zip(inputs)) {
builder.addInput(implementor.visitChild(this, input.i, input.e));
}
@@ -70,7 +79,4 @@ public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
return builder.build();
}
- public static DrillUnionRel convert(Union union, ConversionContext context) throws InvalidRelException{
- throw new UnsupportedOperationException();
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
index ce3d3d7de1..f401ba76bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
@@ -29,6 +29,7 @@ import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.planner.DrillRelBuilder;
@@ -52,6 +53,9 @@ import static org.apache.drill.exec.planner.logical.DrillRel.DRILL_LOGICAL;
*/
public class DrillRelFactories {
+ public static final RelFactories.SetOpFactory DRILL_LOGICAL_SET_OP_FACTORY =
+ new DrillSetOpFactoryImpl();
+
public static final RelFactories.ProjectFactory DRILL_LOGICAL_PROJECT_FACTORY =
new DrillProjectFactoryImpl();
@@ -89,6 +93,25 @@ public class DrillRelFactories {
DEFAULT_VALUES_FACTORY,
DEFAULT_TABLE_SCAN_FACTORY));
+ /**
+ * Implementation of {@link RelFactories.SetOpFactory} that returns
+ * a vanilla {@link DrillExceptRel} or {@link DrillIntersectRel}
+ * dependent on the particular kind of set operation (EXCEPT, INTERSECT)
+ */
+ private static class DrillSetOpFactoryImpl implements RelFactories.SetOpFactory {
+ @Override
+ public RelNode createSetOp(SqlKind kind, List<RelNode> inputs, boolean all) {
+ switch (kind) {
+ case EXCEPT:
+ return DrillExceptRel.create(inputs, all);
+ case INTERSECT:
+ return DrillIntersectRel.create(inputs, all);
+ default:
+ throw new AssertionError("unsupported set op: " + kind);
+ }
+ }
+ }
+
/**
* Implementation of {@link RelFactories.ProjectFactory} that returns a vanilla
* {@link DrillProjectRel}.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSetOpRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSetOpRule.java
new file mode 100644
index 0000000000..6ac15c8e7b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSetOpRule.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories.SetOpFactory;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule that converts {@link LogicalIntersect} or {@link LogicalMinus} to
+ * {@link DrillIntersectRel} or {@link DrillExceptRel}.
+ */
+public class DrillSetOpRule extends RelOptRule {
+ private final SetOpFactory setOpFactory;
+ public static final List<RelOptRule> INSTANCES = Arrays.asList(
+ new DrillSetOpRule(RelOptHelper.any(LogicalIntersect.class, Convention.NONE), "DrillIntersectRelRule", DrillRelFactories.DRILL_LOGICAL_SET_OP_FACTORY),
+ new DrillSetOpRule(RelOptHelper.any(LogicalMinus.class, Convention.NONE), "DrillExceptRelRule", DrillRelFactories.DRILL_LOGICAL_SET_OP_FACTORY)
+ );
+
+ public DrillSetOpRule(RelOptRuleOperand operand, String description, SetOpFactory setOpFactory) {
+ super(operand, description);
+ this.setOpFactory = setOpFactory;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final SetOp setOp = call.rel(0);
+ final List<RelNode> convertedInputs = new ArrayList<>();
+ for (RelNode input : setOp.getInputs()) {
+ RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+ convertedInputs.add(convertedInput);
+ }
+ RelNode newRelNode = this.setOpFactory.createSetOp(setOp.kind, convertedInputs, setOp.all);
+ if (newRelNode != null) {
+ call.transformTo(newRelNode);
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java
index 69e9452d7c..a35d320bc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java
@@ -31,7 +31,7 @@ import org.apache.calcite.util.trace.CalciteTrace;
import org.slf4j.Logger;
/**
- * Rule that converts a {@link LogicalUnion} to a {@link DrillUnionRelBase}, implemented by a "union" operation.
+ * Rule that converts a {@link LogicalUnion} to a {@link DrillUnionRel}, implemented by a "union" operation.
*/
public class DrillUnionAllRule extends RelOptRule {
public static final RelOptRule INSTANCE = new DrillUnionAllRule();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
index a5a6e0330e..263266e1ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
@@ -21,10 +21,10 @@ import java.util.List;
import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.Union;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
import org.apache.drill.exec.planner.torel.ConversionContext;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
@@ -36,11 +36,14 @@ import org.apache.calcite.plan.RelTraitSet;
/**
* Union implemented in Drill.
*/
-public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
+public class DrillUnionRel extends Union implements DrillRel, DrillSetOpRel {
/** Creates a DrillUnionRel. */
public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits,
List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, all, checkCompatibility);
+ super(cluster, traits, inputs, all);
+ if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+ throw new InvalidRelException("Input row types of the Union are not compatible.");
+ }
}
@Override
@@ -62,7 +65,7 @@ public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
@Override
public LogicalOperator implement(DrillImplementor implementor) {
- Union.Builder builder = Union.builder();
+ org.apache.drill.common.logical.data.Union.Builder builder = org.apache.drill.common.logical.data.Union.builder();
for (Ord<RelNode> input : Ord.zip(inputs)) {
builder.addInput(implementor.visitChild(this, input.i, input.e));
}
@@ -70,7 +73,11 @@ public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
return builder.build();
}
- public static DrillUnionRel convert(Union union, ConversionContext context) throws InvalidRelException{
+ public static DrillUnionRel convert(org.apache.drill.common.logical.data.Union union, ConversionContext context) throws InvalidRelException{
throw new UnsupportedOperationException();
}
+
+ public boolean isDistinct() {
+ return !this.all;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
index b87350d6dd..4cdff4feac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.planner.logical;
import java.util.ArrayList;
import java.util.List;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -199,6 +201,35 @@ public class PreProcessLogicalRel extends RelShuttleImpl {
return visitChildren(union);
}
+ @Override
+ public RelNode visit(LogicalIntersect intersect) {
+ for (RelNode child : intersect.getInputs()) {
+ for (RelDataTypeField dataField : child.getRowType().getFieldList()) {
+ if (dataField.getName().contains(SchemaPath.DYNAMIC_STAR)) {
+ unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
+ "Intersect(All) over schema-less tables must specify the columns explicitly\n");
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+ return visitChildren(intersect);
+ }
+
+ @Override
+ public RelNode visit(LogicalMinus minus) {
+ for (RelNode child : minus.getInputs()) {
+ for (RelDataTypeField dataField : child.getRowType().getFieldList()) {
+ if (dataField.getName().contains(SchemaPath.DYNAMIC_STAR)) {
+ unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
+ "Except(All) over schema-less tables must specify the columns explicitly\n");
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
+ return visitChildren(minus);
+ }
+
private UserException getConvertFunctionInvalidTypeException(final RexCall function) {
// Caused by user entering a value with a numeric type
final String functionName = function.getOperator().getName();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
index c3c271eab2..29f26c9305 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
@@ -23,6 +23,8 @@ import java.util.Set;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.Intersect;
import org.apache.drill.common.logical.data.Values;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.GroupingAggregate;
@@ -187,4 +189,20 @@ public class ScanFieldDeterminer extends AbstractLogicalVisitor<Void, ScanFieldD
}
return null;
}
+
+ @Override
+ public Void visitExcept(Except except, FieldList value) {
+ for (LogicalOperator o : except.getInputs()) {
+ o.accept(this, value.clone());
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitIntersect(Intersect intersect, FieldList value) {
+ for (LogicalOperator o : intersect.getInputs()) {
+ o.accept(this, value.clone());
+ }
+ return null;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrel.java
similarity index 55%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrel.java
index 4bdecea4ee..70d4fc6669 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrel.java
@@ -17,60 +17,41 @@
*/
package org.apache.drill.exec.planner.physical;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.planner.cost.DrillCostBase;
-import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.Union;
-
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-public class UnionAllPrel extends UnionPrel {
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
- public UnionAllPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
- boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, true /* all */, checkCompatibility);
+public class SetOpPrel extends SetOp implements Prel {
+ public SetOpPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, SqlKind kind,
+ boolean all) throws InvalidRelException {
+ super(cluster, traits, inputs, kind, all);
}
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("kind", kind);
+ }
- @Override
- public Union copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ public SetOpPrel copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
try {
- return new UnionAllPrel(this.getCluster(), traitSet, inputs,
- false /* don't check compatibility during copy */);
+ return new SetOpPrel(this.getCluster(), traitSet, inputs, kind, all);
}catch (InvalidRelException e) {
throw new AssertionError(e);
}
}
- @Override
- public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
- return super.computeSelfCost(planner, mq).multiplyBy(.1);
- }
- double totalInputRowCount = 0;
- for (int i = 0; i < this.getInputs().size(); i++) {
- totalInputRowCount += mq.getRowCount(this.getInputs().get(i));
- }
-
- double cpuCost = totalInputRowCount * DrillCostBase.BASE_CPU_COST;
- DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
- return costFactory.makeCost(totalInputRowCount, cpuCost, 0, 0);
- }
-
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
List<PhysicalOperator> inputPops = Lists.newArrayList();
@@ -79,8 +60,8 @@ public class UnionAllPrel extends UnionPrel {
inputPops.add( ((Prel)this.getInputs().get(i)).getPhysicalOperator(creator));
}
- UnionAll unionall = new UnionAll(inputPops);
- return creator.addMetadata(this, unionall);
+ org.apache.drill.exec.physical.config.SetOp setOp = new org.apache.drill.exec.physical.config.SetOp(inputPops, kind, all);
+ return creator.addMetadata(this, setOp);
}
@Override
@@ -93,4 +74,18 @@ public class UnionAllPrel extends UnionPrel {
return SelectionVectorMode.NONE;
}
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return PrelUtil.iter(this.getInputs());
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrule.java
new file mode 100644
index 0000000000..58614bf9e2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrule.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.drill.exec.planner.logical.DrillExceptRel;
+import org.apache.drill.exec.planner.logical.DrillIntersectRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.drill.exec.ExecConstants.EXCEPT_ADD_AGG_BELOW;
+import static org.apache.drill.exec.planner.physical.AggPruleBase.remapGroupSet;
+
+public class SetOpPrule extends Prule {
+ public static final List<RelOptRule> DIST_INSTANCES = Arrays.asList(
+ new SetOpPrule(RelOptHelper.any(DrillExceptRel.class), "Prel.HashExceptDistPrule", true),
+ new SetOpPrule(RelOptHelper.any(DrillIntersectRel.class), "Prel.HashIntersectDistPrule", true));
+ public static final List<RelOptRule> BROADCAST_INSTANCES = Arrays.asList(
+ new SetOpPrule(RelOptHelper.any(DrillExceptRel.class), "Prel.HashExceptBroadcastPrule", false),
+ new SetOpPrule(RelOptHelper.any(DrillIntersectRel.class), "Prel.HashIntersectBroadcastPrule", false));
+ protected static final Logger tracer = CalciteTrace.getPlannerTracer();
+ private final boolean isDist;
+
+ private SetOpPrule(RelOptRuleOperand operand, String description, boolean isDist) {
+ super(operand, description);
+ this.isDist = isDist;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final SetOp setOp = call.rel(0);
+ Preconditions.checkArgument(setOp.getInputs().size() == 2, "inputs of set op must be two items.");
+
+ try {
+ if(isDist){
+ createDistBothPlan(call);
+ }else{
+ if (checkBroadcastConditions(setOp.getCluster(), setOp.getInput(0), setOp.getInput(1))) {
+ createBroadcastPlan(call);
+ }
+ }
+ } catch (InvalidRelException e) {
+ tracer.warn(e.toString());
+ }
+ }
+
+ private void createDistBothPlan(RelOptRuleCall call)
+ throws InvalidRelException {
+ int i = 0;
+ int fieldCount = call.rel(0).getInput(0).getRowType().getFieldCount();
+ List<DistributionField> distFields = Lists.newArrayList();
+ while(i < fieldCount) {
+ distFields.add(new DistributionField(i));
+ i++;
+ }
+ DrillDistributionTrait distributionTrait = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ distFields);
+ createPlan(call, distributionTrait);
+
+ if (!PrelUtil.getPlannerSettings(call.getPlanner()).isHashSingleKey()) {
+ return;
+ }
+ if (fieldCount > 1) {
+ for (int j = 0; j < fieldCount; j++) {
+ distributionTrait = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.of(new DistributionField(j)));
+ createPlan(call, distributionTrait);
+ }
+ }
+ }
+
+ private boolean checkBroadcastConditions(RelOptCluster cluster, RelNode left, RelNode right) {
+ double estimatedRightRowCount = RelMetadataQuery.instance().getRowCount(right);
+ return estimatedRightRowCount < PrelUtil.getSettings(cluster).getBroadcastThreshold()
+ && !DrillDistributionTrait.SINGLETON.equals(left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE));
+ }
+
+ private void createBroadcastPlan(final RelOptRuleCall call) throws InvalidRelException {
+ DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
+ createPlan(call, distBroadcastRight);
+ }
+
+ private void createPlan(final RelOptRuleCall call, DrillDistributionTrait setOpTrait) throws InvalidRelException {
+ if (needAddAgg(call.rel(0))) {
+ ImmutableBitSet groupSet = ImmutableBitSet.range(0, call.rel(0).getInput(0).getRowType().getFieldList().size());
+
+ // hashAgg: hash distribute on all grouping keys
+ DrillDistributionTrait distOnAllKeys =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(groupSet, true /* get all grouping keys */)));
+ RelTraitSet aggTraits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys);
+ createTransformRequest(call, aggTraits, setOpTrait, null);
+
+ // hashAgg: hash distribute on single grouping key
+ DrillDistributionTrait distOnOneKey =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(groupSet, false /* get single grouping key */)));
+ aggTraits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnOneKey);
+ createTransformRequest(call, aggTraits, setOpTrait, null);
+
+ // streamAgg: hash distribute on all grouping keys
+ final RelCollation collation = getCollation(groupSet);
+ aggTraits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnAllKeys);
+ createTransformRequest(call, aggTraits, setOpTrait, collation);
+ } else {
+ call.transformTo(buildSetOpPrel(call, null, setOpTrait));
+ }
+ }
+
+ private boolean needAddAgg(SetOp setOp) {
+ if (setOp.all || !(setOp instanceof DrillExceptRel)) {
+ return false;
+ }
+ Set<ImmutableBitSet> uniqueKeys = setOp.getCluster().getMetadataQuery().getUniqueKeys(((RelSubset)setOp.getInput(0)).getBestOrOriginal());
+ if (uniqueKeys == null) {
+ return true;
+ }
+ return uniqueKeys.size() < setOp.getRowType().getFieldCount();
+ }
+
+ private void createTransformRequest(RelOptRuleCall call, RelTraitSet aggTraits, DrillDistributionTrait setOpTrait, RelCollation collation) throws InvalidRelException {
+ boolean addAggBelow = PrelUtil.getPlannerSettings(call.getPlanner()).getOptions().getOption(EXCEPT_ADD_AGG_BELOW);
+ RelNode outputRel;
+ if (addAggBelow) {
+ AggPrelBase newAgg = buildAggPrel(call, call.rel(0).getInput(0), aggTraits, collation);
+ outputRel = buildSetOpPrel(call, newAgg, setOpTrait);
+ } else {
+ SetOpPrel setOpPrel = buildSetOpPrel(call, null, setOpTrait);
+ outputRel = buildAggPrel(call, setOpPrel, aggTraits, collation);
+ }
+ call.transformTo(outputRel);
+ }
+
+ private AggPrelBase buildAggPrel(RelOptRuleCall call, RelNode input, RelTraitSet aggTraits, RelCollation collation) throws InvalidRelException {
+ final DrillExceptRel drillExceptRel = call.rel(0);
+ ImmutableBitSet groupSet = ImmutableBitSet.range(0, drillExceptRel.getInput(0).getRowType().getFieldList().size());
+ if (collation != null) {
+ final RelNode convertedInput = convert(input, aggTraits);
+ return new StreamAggPrel(
+ drillExceptRel.getCluster(),
+ aggTraits,
+ convertedInput,
+ groupSet,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ AggPrelBase.OperatorPhase.PHASE_1of1);
+ } else {
+ RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, aggTraits));
+ return new HashAggPrel(
+ drillExceptRel.getCluster(),
+ aggTraits,
+ convertedInput,
+ groupSet,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ AggPrelBase.OperatorPhase.PHASE_1of1);
+ }
+ }
+
+ private SetOpPrel buildSetOpPrel(RelOptRuleCall call, RelNode convertedLeft, DrillDistributionTrait setOpTrait) throws InvalidRelException {
+ final SetOp setOp = call.rel(0);
+ final RelNode right = setOp.getInput(1);
+ RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(setOpTrait);
+ final RelNode convertedRight = convert(right, traitsRight);
+
+ if (convertedLeft == null) {
+ final RelNode left = setOp.getInput(0);
+ RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+ if (DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED.equals(setOpTrait.getType())) {
+ traitsLeft.plus(setOpTrait);
+ }
+ convertedLeft = convert(left, traitsLeft);
+ }
+ final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
+ return new SetOpPrel(convertedLeft.getCluster(), traitSet, ImmutableList.of(convertedLeft, convertedRight), setOp.kind, setOp.all);
+ }
+
+ private List<DistributionField> getDistributionField(ImmutableBitSet groupSet, boolean allFields) {
+ List<DistributionField> groupByFields = Lists.newArrayList();
+
+ for (int group : remapGroupSet(groupSet)) {
+ DistributionField field = new DistributionField(group);
+ groupByFields.add(field);
+
+ if (!allFields && groupByFields.size() == 1) {
+ // TODO: if we are only interested in 1 grouping field, pick the first one for now..
+ // but once we have num distinct values (NDV) statistics, we should pick the one
+ // with highest NDV.
+ break;
+ }
+ }
+
+ return groupByFields;
+ }
+
+ private RelCollation getCollation(ImmutableBitSet groupSet) {
+
+ List<RelFieldCollation> fields = Lists.newArrayList();
+ for (int group : BitSets.toIter(groupSet)) {
+ fields.add(new RelFieldCollation(group));
+ }
+ return RelCollations.of(fields);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
index 4bdecea4ee..77b594e968 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
@@ -39,9 +39,9 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class UnionAllPrel extends UnionPrel {
- public UnionAllPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
- boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, true /* all */, checkCompatibility);
+ public UnionAllPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs)
+ throws InvalidRelException {
+ super(cluster, traits, inputs, true /* all */);
}
@@ -49,8 +49,7 @@ public class UnionAllPrel extends UnionPrel {
@Override
public Union copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
try {
- return new UnionAllPrel(this.getCluster(), traitSet, inputs,
- false /* don't check compatibility during copy */);
+ return new UnionAllPrel(this.getCluster(), traitSet, inputs);
}catch (InvalidRelException e) {
throw new AssertionError(e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
index 511ac8a434..0a3cdc6649 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
@@ -101,8 +101,7 @@ public class UnionAllPrule extends Prule {
Preconditions.checkArgument(convertedInputList.size() >= 2, "Union list must be at least two items.");
RelNode left = convertedInputList.get(0);
for (int i = 1; i < convertedInputList.size(); i++) {
- left = new UnionAllPrel(union.getCluster(), traits, ImmutableList.of(left, convertedInputList.get(i)),
- false /* compatibility already checked during logical phase */);
+ left = new UnionAllPrel(union.getCluster(), traits, ImmutableList.of(left, convertedInputList.get(i)));
}
call.transformTo(left);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
index d58c9f9f8c..992950164b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
@@ -38,17 +38,16 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class UnionDistinctPrel extends UnionPrel {
- public UnionDistinctPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
- boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, false /* all = false */, checkCompatibility);
+ public UnionDistinctPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs)
+ throws InvalidRelException {
+ super(cluster, traits, inputs, false /* all = false */);
}
@Override
public Union copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
try {
- return new UnionDistinctPrel(this.getCluster(), traitSet, inputs,
- false /* don't check compatibility during copy */);
+ return new UnionDistinctPrel(this.getCluster(), traitSet, inputs);
}catch (InvalidRelException e) {
throw new AssertionError(e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
index 5a088f5d5a..91a1021505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
@@ -61,8 +61,7 @@ public class UnionDistinctPrule extends Prule {
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
UnionDistinctPrel unionDistinct =
- new UnionDistinctPrel(union.getCluster(), traits, convertedInputList,
- false /* compatibility already checked during logical phase */);
+ new UnionDistinctPrel(union.getCluster(), traits, convertedInputList);
call.transformTo(unionDistinct);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
index 79d5611c80..6fbc4d37f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
@@ -17,21 +17,19 @@
*/
package org.apache.drill.exec.planner.physical;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+
+import java.util.Iterator;
+import java.util.List;
-public abstract class UnionPrel extends DrillUnionRelBase implements Prel{
+public abstract class UnionPrel extends Union implements Prel{
- public UnionPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all,
- boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, all, checkCompatibility);
+ public UnionPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
index 144b62cf3e..b0debc8ef0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.SetOpPrel;
import org.apache.drill.exec.planner.physical.UnionPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.calcite.rel.RelNode;
@@ -86,8 +87,8 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
@Override
public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
- if(prel instanceof UnionPrel) {
- return addColumnOrderingBelowUnion(prel);
+ if(prel instanceof UnionPrel || prel instanceof SetOpPrel) {
+ return addColumnOrderingBelow(prel);
}
List<RelNode> children = Lists.newArrayList();
@@ -106,13 +107,13 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
}
}
- private Prel addColumnOrderingBelowUnion(Prel prel) {
+ private Prel addColumnOrderingBelow(Prel prel) {
List<RelNode> children = Lists.newArrayList();
for (Prel p : prel) {
Prel child = p.accept(this, null);
- boolean needProjectBelowUnion = !(p instanceof ProjectPrel);
- if(needProjectBelowUnion) {
+ boolean needProjectBelow = !(p instanceof ProjectPrel);
+ if(needProjectBelow) {
child = addTrivialOrderedProjectPrel(child, false);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index 3671bc4fd3..362993eb0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner.sql.handlers;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -65,7 +66,6 @@ import java.util.List;
import org.apache.calcite.rex.RexBuilder;
import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
import org.apache.drill.exec.planner.common.DrillJoinRelBase;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
import org.apache.drill.exec.util.Pointer;
import java.math.BigDecimal;
@@ -184,7 +184,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() {
@Override
public RelNode visit(RelNode other) {
- if (other instanceof DrillUnionRelBase) {
+ if (other instanceof DrillSetOpRel) {
isUnsupported.value = true;
return other;
} else if (other instanceof DrillProjectRelBase) {
@@ -259,7 +259,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
public RelNode visit(RelNode other) {
if (other instanceof DrillJoinRelBase ||
other instanceof DrillAggregateRelBase ||
- other instanceof DrillUnionRelBase) {
+ other instanceof DrillSetOpRel) {
return other;
}
if (other instanceof DrillLimitRel) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
index a8bdd1c7e3..8beefb7362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
@@ -236,14 +236,6 @@ public class UnsupportedOperatorsVisitor extends SqlShuttle {
}
}
- // Disable unsupported Intersect, Except
- if (sqlCall.getKind() == SqlKind.INTERSECT || sqlCall.getKind() == SqlKind.EXCEPT) {
- unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
- sqlCall.getOperator().getName() + " is not supported\n" +
- "See Apache Drill JIRA: DRILL-1921");
- throw new UnsupportedOperationException();
- }
-
// Disable unsupported JOINs
if (sqlCall.getKind() == SqlKind.JOIN) {
SqlJoin join = (SqlJoin) sqlCall;
@@ -555,4 +547,4 @@ public class UnsupportedOperatorsVisitor extends SqlShuttle {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
index 16b3ed8f9a..58e75df6e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
@@ -25,8 +25,10 @@ import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Except;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.Intersect;
import org.apache.drill.common.logical.data.Join;
import org.apache.drill.common.logical.data.Limit;
import org.apache.drill.common.logical.data.LogicalOperator;
@@ -36,6 +38,8 @@ import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.common.logical.data.Union;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillExceptRel;
+import org.apache.drill.exec.planner.logical.DrillIntersectRel;
import org.apache.drill.exec.planner.logical.DrillJoinRel;
import org.apache.drill.exec.planner.logical.DrillLimitRel;
import org.apache.drill.exec.planner.logical.DrillRel;
@@ -156,6 +160,16 @@ public class ConversionContext implements ToRelContext {
return DrillUnionRel.convert(union, context);
}
+ @Override
+ public RelNode visitExcept(Except except, ConversionContext context) throws InvalidRelException{
+ return DrillExceptRel.convert(except, context);
+ }
+
+ @Override
+ public RelNode visitIntersect(Intersect intersect, ConversionContext context) throws InvalidRelException{
+ return DrillIntersectRel.convert(intersect, context);
+ }
+
@Override
public RelNode visitGroupingAggregate(GroupingAggregate groupBy, ConversionContext context)
throws InvalidRelException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 64658ae871..c409053fc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -197,6 +197,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR),
new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR),
new OptionDefinition(ExecConstants.ENABLE_UNION_TYPE),
+ new OptionDefinition(ExecConstants.EXCEPT_ADD_AGG_BELOW),
new OptionDefinition(ExecConstants.TEXT_ESTIMATED_ROW_SIZE),
new OptionDefinition(ExecConstants.TEXT_WRITER_ADD_HEADER_VALIDATOR),
new OptionDefinition(ExecConstants.TEXT_WRITER_FORCE_QUOTES_VALIDATOR),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
index 3d61ea5fba..a606d70f6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
@@ -24,9 +24,10 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
import org.apache.drill.exec.store.plan.PluginImplementor;
import java.io.IOException;
@@ -35,11 +36,14 @@ import java.util.List;
/**
* Union implementation for Drill plugins.
*/
-public class PluginUnionRel extends DrillUnionRelBase implements PluginRel {
+public class PluginUnionRel extends Union implements PluginRel, DrillSetOpRel {
public PluginUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
boolean all, boolean checkCompatibility) throws InvalidRelException {
- super(cluster, traits, inputs, all, checkCompatibility);
+ super(cluster, traits, inputs, all);
+ if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+ throw new InvalidRelException("Input row types of the Union are not compatible.");
+ }
}
@Override
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 32203ca7a7..981e51951b 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -558,6 +558,7 @@ drill.exec.options: {
exec.bulk_load_table_list.bulk_size: 1000,
exec.enable_bulk_load_table_list: false,
exec.enable_union_type: false,
+ exec.except_add_agg_below: false,
exec.errors.verbose: false,
drill.exec.http.rest.errors.verbose: false,
exec.hashjoin.mem_limit: 0,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
index be2f1f4f7a..b1649fc535 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
@@ -47,42 +47,6 @@ public class TestDisabledFunctionality extends BaseTestQuery {
throw ex;
}
- @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
- public void testDisabledIntersect() throws Exception {
- try {
- test("(select n_name as name from cp.`tpch/nation.parquet`) INTERSECT (select r_name as name from cp.`tpch/region.parquet`)");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
- @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
- public void testDisabledIntersectALL() throws Exception {
- try {
- test("(select n_name as name from cp.`tpch/nation.parquet`) INTERSECT ALL (select r_name as name from cp.`tpch/region.parquet`)");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
- @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
- public void testDisabledExceptALL() throws Exception {
- try {
- test("(select n_name as name from cp.`tpch/nation.parquet`) EXCEPT ALL (select r_name as name from cp.`tpch/region.parquet`)");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
- @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
- public void testDisabledExcept() throws Exception {
- try {
- test("(select n_name as name from cp.`tpch/nation.parquet`) EXCEPT (select r_name as name from cp.`tpch/region.parquet`)");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
@Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
public void testDisabledNaturalJoin() throws Exception {
try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java b/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java
new file mode 100644
index 0000000000..98d32605fd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java
@@ -0,0 +1,1182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.file.Paths;
+import java.util.List;
+
+@Category({SqlTest.class, OperatorTest.class})
+public class TestSetOp extends ClusterTest {
+ private static final String EMPTY_DIR_NAME = "empty_directory";
+ private static final String SLICE_TARGET_DEFAULT = "alter session reset `planner.slice_target`";
+
+ @BeforeClass
+ public static void setupTestFiles() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "parquet"));
+ dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
+
+ // A tmp workspace with a default format defined for tests that need to
+ // query empty directories without encountering an error.
+ cluster.defineWorkspace(
+ StoragePluginTestUtils.DFS_PLUGIN_NAME,
+ "tmp_default_format",
+ dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+ "csvh"
+ );
+ }
+
+ @Test
+ public void TestExceptionWithSchemaLessDataSource() {
+ String root = "/multilevel/csv/1994/Q1/orders_94_q1.csv";
+ try {
+ testBuilder()
+ .sqlQuery("select * from cp.`%s` intersect select * from cp.`%s`", root, root)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(1, 1)
+ .go();
+ Assert.fail("Missing expected exception on schema less data source");
+ } catch (Exception ex) {
+ Assert.assertThat(ex.getMessage(), ex.getMessage(),
+ CoreMatchers.containsString("schema-less tables must specify the columns explicitly"));
+ }
+
+ try {
+ testBuilder()
+ .sqlQuery("select * from cp.`%s` except select * from cp.`%s`", root, root)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(1, 1)
+ .go();
+ Assert.fail("Missing expected exception on schema less data source");
+ } catch (Exception ex) {
+ Assert.assertThat(ex.getMessage(), ex.getMessage(),
+ CoreMatchers.containsString("schema-less tables must specify the columns explicitly"));
+ }
+ }
+
+ @Test
+ public void testIntersect() throws Exception {
+ String query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) intersect select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(2, 2)
+ .baselineValues(1, 1)
+ .build().run();
+
+ query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) intersect all select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(2, 2)
+ .baselineValues(1, 1)
+ .baselineValues(1, 1)
+ .build().run();
+ }
+
+ @Test
+ public void testExcept() throws Exception {
+ String query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) except select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+ String aggAbovePattern = ".*Screen.*Agg.*SetOp.*";
+ String aggBelowPattern = ".*SetOp.*Agg.*Values.*";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(aggAbovePattern)
+ .exclude(aggBelowPattern)
+ .match(true);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(4, 4)
+ .baselineValues(3, 4)
+ .build().run();
+
+ try {
+ client.alterSession(ExecConstants.EXCEPT_ADD_AGG_BELOW_KEY, true);
+ query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) except select a, b from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(aggBelowPattern)
+ .exclude(aggAbovePattern)
+ .match(true);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(4, 4)
+ .baselineValues(3, 4)
+ .build().run();
+ } finally {
+ client.resetSession(ExecConstants.EXCEPT_ADD_AGG_BELOW_KEY);
+ }
+
+ query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) except all select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(4, 4)
+ .baselineValues(4, 4)
+ .baselineValues(3, 4)
+ .baselineValues(2, 2)
+ .build().run();
+ }
+
+
+ @Test
+ public void testOverJoin() throws Exception {
+ String query =
+ "select n1.n_nationkey from cp.`tpch/nation.parquet` n1 inner join cp.`tpch/region.parquet` r1 on n1.n_regionkey = r1.r_regionkey where n1.n_nationkey in (1, 2, 3, 4) " +
+ "except " +
+ "select n2.n_nationkey from cp.`tpch/nation.parquet` n2 inner join cp.`tpch/region.parquet` r2 on n2.n_regionkey = r2.r_regionkey where n2.n_nationkey in (3, 4)";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_nationkey")
+ .baselineValues(1)
+ .baselineValues(2)
+ .build().run();
+ }
+
+ @Test
+ public void testExceptOverAgg() throws Exception {
+ String query = "select n1.n_regionkey from cp.`tpch/nation.parquet` n1 group by n1.n_regionkey except " +
+ "select r1.r_regionkey from cp.`tpch/region.parquet` r1 where r1.r_regionkey in (0, 1) group by r1.r_regionkey";
+
+ String excludePattern = "Screen.*Agg.*SetOp";
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .exclude(excludePattern)
+ .match(true);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_regionkey")
+ .baselineValues(2)
+ .baselineValues(3)
+ .baselineValues(4)
+ .build().run();
+ }
+
+ @Test
+ public void testChain() throws Exception {
+ String query = "select n_regionkey from cp.`tpch/nation.parquet` intersect " +
+ "select r_regionkey from cp.`tpch/region.parquet` intersect " +
+ "select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey in (1,2) intersect " +
+ "select c_custkey from cp.`tpch/customer.parquet` where c_custkey < 5";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_regionkey")
+ .baselineValues(1)
+ .baselineValues(2)
+ .build().run();
+ }
+
+ @Test
+ public void testSameColumn() throws Exception {
+ String query = "select n_nationkey, n_regionkey from cp.`tpch/nation.parquet` where n_regionkey = 1 intersect all select r_regionkey, r_regionkey from cp.`tpch/region.parquet` where r_regionkey = 1";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_nationkey", "n_regionkey")
+ .baselineValues(1, 1)
+ .build().run();
+
+ query = "select n_regionkey, n_regionkey from cp.`tpch/nation.parquet` where n_regionkey = 1 except all select r_regionkey, r_regionkey from cp.`tpch/region.parquet` where r_regionkey = 1";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_regionkey", "n_regionkey0")
+ .baselineValues(1, 1)
+ .baselineValues(1, 1)
+ .baselineValues(1, 1)
+ .baselineValues(1, 1)
+ .build().run();
+ }
+
+ @Test
+ public void testTwoStringColumns() throws Exception {
+ String query = "select r_comment, r_regionkey from cp.`tpch/region.parquet` except select n_name, n_nationkey from cp.`tpch/nation.parquet`";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("r_comment", "r_regionkey")
+ .baselineValues("lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ", 0)
+ .baselineValues("hs use ironic, even requests. s", 1)
+ .baselineValues("ges. thinly even pinto beans ca", 2)
+ .baselineValues("ly final courts cajole furiously final excuse", 3)
+ .baselineValues("uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl", 4)
+ .build().run();
+ }
+
+
+ @Test
+ public void testConstantLiterals() throws Exception {
+ String query = "(select 'CONST' as LiteralConstant, 1 as NumberConstant, n_nationkey from cp.`tpch/nation.parquet`) " +
+ "intersect " +
+ "(select 'CONST', 1, r_regionkey from cp.`tpch/region.parquet`)";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("LiteralConstant", "NumberConstant", "n_nationkey")
+ .baselineValues("CONST", 1, 0)
+ .baselineValues("CONST", 1, 1)
+ .baselineValues("CONST", 1, 2)
+ .baselineValues("CONST", 1, 3)
+ .baselineValues("CONST", 1, 4)
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testViewExpandableStar() throws Exception {
+ try {
+ run("use dfs.tmp");
+ run("create view nation_view as select n_nationkey, n_name from (values(4,'4'), (2,'2'), (4,'4'), (1,'1'), (3,'4'), (2,'2'), (1,'1')) t(n_nationkey, n_name)");
+ run("create view region_view as select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name)");
+
+ String query1 = "(select * from dfs.tmp.`nation_view`) " +
+ "except " +
+ "(select * from dfs.tmp.`region_view`) ";
+
+ String query2 = "(select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name)) " +
+ "intersect " +
+ "(select * from dfs.tmp.`nation_view`)";
+
+ testBuilder()
+ .sqlQuery(query1)
+ .unOrdered()
+ .baselineColumns("n_nationkey", "n_name")
+ .baselineValues(4, "4")
+ .baselineValues(3, "4")
+ .build().run();
+
+ testBuilder()
+ .sqlQuery(query2)
+ .unOrdered()
+ .baselineColumns("r_regionkey", "r_name")
+ .baselineValues(1, "1")
+ .baselineValues(2, "2")
+ .build().run();
+ } finally {
+ run("drop view if exists nation_view");
+ run("drop view if exists region_view");
+ }
+ }
+
+ @Test
+ public void testDiffDataTypesAndModes() throws Exception {
+ try {
+ run("use dfs.tmp");
+ run("create view nation_view as select n_nationkey, n_name from (values(4,'4'), (2,'2'), (4,'4'), (1,'1'), (3,'4'), (2,'2'), (1,'1')) t(n_nationkey, n_name)");
+ run("create view region_view as select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name)");
+
+
+ String t1 = "(select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name))";
+ String t2 = "(select * from nation_view)";
+ String t3 = "(select * from region_view)";
+ String t4 = "(select store_id, full_name from cp.`employee.json` limit 5)";
+
+ String query1 = t1 + " intersect all " + t2 + " intersect all " + t3 + " except all " + t4;
+
+ testBuilder()
+ .sqlQuery(query1)
+ .unOrdered()
+ .baselineColumns("r_regionkey", "r_name")
+ .baselineValues(1, "1")
+ .baselineValues(1, "1")
+ .baselineValues(2, "2")
+ .build().run();
+ } finally {
+ run("drop view if exists nation_view");
+ run("drop view if exists region_view");
+ }
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testDistinctOverIntersectAllWithFullyQualifiedColumnNames() throws Exception {
+ String query = "select distinct sq.x1 " +
+ "from " +
+ "((select n_regionkey as a1 from cp.`tpch/nation.parquet`) " +
+ "intersect all " +
+ "(select r_regionkey as a2 from cp.`tpch/region.parquet`)) as sq(x1)";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("x1")
+ .baselineValues(0)
+ .baselineValues(1)
+ .baselineValues(2)
+ .baselineValues(3)
+ .baselineValues(4)
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testContainsColumnAndNumericConstant() throws Exception {
+ String query = "(select n_nationkey, n_regionkey, n_name from cp.`tpch/nation.parquet`) " +
+ "intersect " +
+ "(select 1, n_regionkey, 'ARGENTINA' from cp.`tpch/nation.parquet`)";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_nationkey", "n_regionkey", "n_name")
+ .baselineValues(1, 1, "ARGENTINA")
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testEmptySides() throws Exception {
+ String query1 = "(select n_nationkey, n_regionkey, n_name from cp.`tpch/nation.parquet` limit 0) " +
+ "intersect " +
+ "(select 1, n_regionkey, 'ARGENTINA' from cp.`tpch/nation.parquet`)";
+
+ String query2 = "(select n_nationkey, n_regionkey, n_name from cp.`tpch/nation.parquet` where n_nationkey = 1) " +
+ "except " +
+ "(select 1, n_regionkey, 'ARGENTINA' from cp.`tpch/nation.parquet` limit 0)";
+
+ testBuilder()
+ .sqlQuery(query1)
+ .unOrdered()
+ .baselineColumns("n_nationkey", "n_regionkey", "n_name")
+ .expectsEmptyResultSet()
+ .build().run();
+
+ testBuilder()
+ .sqlQuery(query2)
+ .unOrdered()
+ .baselineColumns("n_nationkey", "n_regionkey", "n_name")
+ .baselineValues(1, 1, "ARGENTINA")
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testAggregationOnIntersectOperator() throws Exception {
+ String root = "/store/text/data/t.json";
+
+ testBuilder()
+ .sqlQuery("(select calc1, max(b1) as `max`, min(b1) as `min`, count(c1) as `count` " +
+ "from (select a1 + 10 as calc1, b1, c1 from cp.`%s` " +
+ "intersect all select a1 + 10 as diff1, b1 as diff2, c1 as diff3 from cp.`%s`) " +
+ "group by calc1 order by calc1)", root, root)
+ .ordered()
+ .baselineColumns("calc1", "max", "min", "count")
+ .baselineValues(10L, 2L, 1L, 5L)
+ .baselineValues(20L, 5L, 3L, 5L)
+ .build().run();
+
+ testBuilder()
+ .sqlQuery("(select calc1, min(b1) as `min`, max(b1) as `max`, count(c1) as `count` " +
+ "from (select a1 + 10 as calc1, b1, c1 from cp.`%s` " +
+ "intersect all select a1 + 10 as diff1, b1 as diff2, c1 as diff3 from cp.`%s`) " +
+ "group by calc1 order by calc1)", root, root)
+ .ordered()
+ .baselineColumns("calc1", "min", "max", "count")
+ .baselineValues(10L, 1L, 2L, 5L)
+ .baselineValues(20L, 3L, 5L, 5L)
+ .build().run();
+ }
+
+ @Test(expected = UserException.class)
+ public void testImplicitCastingFailure() throws Exception {
+ String rootInt = "/store/json/intData.json";
+ String rootBoolean = "/store/json/booleanData.json";
+
+ run("(select key from cp.`%s` " +
+ "intersect all " +
+ "select key from cp.`%s` )", rootInt, rootBoolean);
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testDateAndTimestampJson() throws Exception {
+ String rootDate = "/store/json/dateData.json";
+ String rootTimpStmp = "/store/json/timeStmpData.json";
+
+ testBuilder()
+ .sqlQuery("(select max(key) as key from cp.`%s` " +
+ "except all select key from cp.`%s`)", rootDate, rootTimpStmp)
+ .unOrdered()
+ .baselineColumns("key")
+ .baselineValues("2011-07-26")
+ .build().run();
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` " +
+ "except select max(key) as key from cp.`%s`", rootTimpStmp, rootDate)
+ .unOrdered()
+ .baselineColumns("key")
+ .baselineValues("2015-03-26 19:04:55.542")
+ .baselineValues("2015-03-26 19:04:55.543")
+ .baselineValues("2015-03-26 19:04:55.544")
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testOneInputContainsAggFunction() throws Exception {
+ String root = "/multilevel/csv/1994/Q1/orders_94_q1.csv";
+
+ testBuilder()
+ .sqlQuery("select * from ((select max(c1) as ct from (select columns[0] c1 from cp.`%s`)) \n" +
+ "intersect all (select columns[0] c2 from cp.`%s`)) order by ct limit 3", root, root)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues("99")
+ .build().run();
+
+ testBuilder()
+ .sqlQuery("select * from ((select columns[0] ct from cp.`%s`)\n" +
+ "intersect all (select max(c1) as c2 from (select columns[0] c1 from cp.`%s`))) order by ct limit 3", root, root)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues("99")
+ .build().run();
+
+ testBuilder()
+ .sqlQuery("select * from ((select max(c1) as ct from (select columns[0] c1 from cp.`%s`))\n" +
+ "intersect all (select max(c1) as c2 from (select columns[0] c1 from cp.`%s`))) order by ct", root, root)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues("99")
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testInputsGroupByOnCSV() throws Exception {
+ String root = "/multilevel/csv/1994/Q1/orders_94_q1.csv";
+
+ testBuilder()
+ .sqlQuery("select * from \n" +
+ "((select columns[0] as col0 from cp.`%s` t1 \n" +
+ "where t1.columns[0] = 66) \n" +
+ "intersect all \n" +
+ "(select columns[0] c2 from cp.`%s` t2 \n" +
+ "where t2.columns[0] is not null \n" +
+ "group by columns[0])) \n" +
+ "group by col0",
+ root, root)
+ .unOrdered()
+ .baselineColumns("col0")
+ .baselineValues("66")
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testDiffTypesAtPlanning() throws Exception {
+ testBuilder()
+ .sqlQuery("select count(c1) as ct from (select cast(r_regionkey as int) c1 from cp.`tpch/region.parquet`) " +
+ "intersect (select cast(r_regionkey as int) + 1 c2 from cp.`tpch/region.parquet`)")
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 5)
+ .build().run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testRightEmptyJson() throws Exception {
+ String rootEmpty = "/project/pushdown/empty.json";
+ String rootSimple = "/store/json/booleanData.json";
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` " +
+ "intersect all " +
+ "select key from cp.`%s`",
+ rootSimple,
+ rootEmpty)
+ .unOrdered()
+ .baselineColumns("key")
+ .expectsEmptyResultSet()
+ .build().run();
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` " +
+ "except all " +
+ "select key from cp.`%s`",
+ rootSimple,
+ rootEmpty)
+ .unOrdered()
+ .baselineColumns("key")
+ .baselineValues(true)
+ .baselineValues(false)
+ .build().run();
+ }
+
+ @Test
+ public void testLeftEmptyJson() throws Exception {
+ final String rootEmpty = "/project/pushdown/empty.json";
+ final String rootSimple = "/store/json/booleanData.json";
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` " +
+ "intersect all " +
+ "select key from cp.`%s`",
+ rootEmpty,
+ rootSimple)
+ .unOrdered()
+ .baselineColumns("key")
+ .expectsEmptyResultSet()
+ .build().run();
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` " +
+ "except all " +
+ "select key from cp.`%s`",
+ rootEmpty,
+ rootSimple)
+ .unOrdered()
+ .baselineColumns("key")
+ .expectsEmptyResultSet()
+ .build().run();
+ }
+
+ @Test
+ public void testBothEmptyJson() throws Exception {
+ final String rootEmpty = "/project/pushdown/empty.json";
+
+ final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
+ final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+ .setMinorType(TypeProtos.MinorType.INT)
+ .setMode(TypeProtos.DataMode.OPTIONAL)
+ .build();
+ expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` " +
+ "intersect all " +
+ "select key from cp.`%s`",
+ rootEmpty,
+ rootEmpty)
+ .schemaBaseLine(expectedSchema)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testRightEmptyDataBatch() throws Exception {
+ String rootSimple = "/store/json/booleanData.json";
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` " +
+ "except all " +
+ "select key from cp.`%s` where 1 = 0",
+ rootSimple,
+ rootSimple)
+ .unOrdered()
+ .baselineColumns("key")
+ .baselineValues(true)
+ .baselineValues(false)
+ .build().run();
+ }
+
+ @Test
+ public void testLeftEmptyDataBatch() throws Exception {
+ String rootSimple = "/store/json/booleanData.json";
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` where 1 = 0 " +
+ "except all " +
+ "select key from cp.`%s`",
+ rootSimple,
+ rootSimple)
+ .unOrdered()
+ .baselineColumns("key")
+ .expectsEmptyResultSet()
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testBothEmptyDataBatch() throws Exception {
+ String rootSimple = "/store/json/booleanData.json";
+
+ final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
+ final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+ .setMinorType(TypeProtos.MinorType.BIT) // field "key" is boolean type
+ .setMode(TypeProtos.DataMode.OPTIONAL)
+ .build();
+ expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+ testBuilder()
+ .sqlQuery("select key from cp.`%s` where 1 = 0 " +
+ "intersect all " +
+ "select key from cp.`%s` where 1 = 0",
+ rootSimple,
+ rootSimple)
+ .schemaBaseLine(expectedSchema)
+ .build()
+ .run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testInListOnIntersect() throws Exception {
+ String query = "select n_nationkey \n" +
+ "from (select n1.n_nationkey from cp.`tpch/nation.parquet` n1 inner join cp.`tpch/region.parquet` r1 on n1.n_regionkey = r1.r_regionkey \n" +
+ "intersect \n" +
+ "select n2.n_nationkey from cp.`tpch/nation.parquet` n2 inner join cp.`tpch/region.parquet` r2 on n2.n_regionkey = r2.r_regionkey) \n" +
+ "where n_nationkey in (1, 2)";
+
+ // Validate the plan
+ final String[] expectedPlan = {"Project.*\n" +
+ ".*SetOp\\(all=\\[false\\], kind=\\[INTERSECT\\]\\).*\n" +
+ ".*Project.*\n" +
+ ".*HashJoin.*\n" +
+ ".*SelectionVectorRemover.*\n" +
+ ".*Filter.*\n" +
+ ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" +
+ ".*Scan.*columns=\\[`r_regionkey`\\].*\n" +
+ ".*Project.*\n" +
+ ".*HashJoin.*\n" +
+ ".*SelectionVectorRemover.*\n" +
+ ".*Filter.*\n" +
+ ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" +
+ ".*Scan.*columns=\\[`r_regionkey`\\].*"};
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlan)
+ .match();
+
+ // Validate the result
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("n_nationkey")
+ .baselineValues(1)
+ .baselineValues(2)
+ .build()
+ .run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testIntersectWith() throws Exception {
+ final String query1 = "WITH year_total \n" +
+ " AS (SELECT c.r_regionkey customer_id,\n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c\n" +
+ " Intersect ALL \n" +
+ " SELECT c.r_regionkey customer_id, \n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c) \n" +
+ "SELECT count(t_s_secyear.customer_id) as ct \n" +
+ "FROM year_total t_s_firstyear, \n" +
+ " year_total t_s_secyear, \n" +
+ " year_total t_w_firstyear, \n" +
+ " year_total t_w_secyear \n" +
+ "WHERE t_s_secyear.customer_id = t_s_firstyear.customer_id \n" +
+ " AND t_s_firstyear.customer_id = t_w_secyear.customer_id \n" +
+ " AND t_s_firstyear.customer_id = t_w_firstyear.customer_id \n" +
+ " AND CASE \n" +
+ " WHEN t_w_firstyear.year_total > 0 THEN t_w_secyear.year_total \n" +
+ " ELSE NULL \n" +
+ " END > -1";
+
+ final String query2 = "WITH year_total \n" +
+ " AS (SELECT c.r_regionkey customer_id,\n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c\n" +
+ " Intersect ALL \n" +
+ " SELECT c.r_regionkey customer_id, \n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c) \n" +
+ "SELECT count(t_w_firstyear.customer_id) as ct \n" +
+ "FROM year_total t_w_firstyear, \n" +
+ " year_total t_w_secyear \n" +
+ "WHERE t_w_firstyear.year_total = t_w_secyear.year_total \n" +
+ " AND t_w_firstyear.year_total > 0 and t_w_secyear.year_total > 0";
+
+ final String query3 = "WITH year_total_1\n" +
+ " AS (SELECT c.r_regionkey customer_id,\n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c\n" +
+ " Intersect ALL \n" +
+ " SELECT c.r_regionkey customer_id, \n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c) \n" +
+ " , year_total_2\n" +
+ " AS (SELECT c.n_nationkey customer_id,\n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/nation.parquet` c\n" +
+ " Intersect ALL \n" +
+ " SELECT c.n_nationkey customer_id, \n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/nation.parquet` c) \n" +
+ " SELECT count(t_w_firstyear.customer_id) as ct\n" +
+ " FROM year_total_1 t_w_firstyear,\n" +
+ " year_total_2 t_w_secyear\n" +
+ " WHERE t_w_firstyear.year_total = t_w_secyear.year_total\n" +
+ " AND t_w_firstyear.year_total > 0 and t_w_secyear.year_total > 0";
+
+ final String query4 = "WITH year_total_1\n" +
+ " AS (SELECT c.n_regionkey customer_id,\n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/nation.parquet` c\n" +
+ " Intersect ALL \n" +
+ " SELECT c.r_regionkey customer_id, \n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c), \n" +
+ " year_total_2\n" +
+ " AS (SELECT c.n_regionkey customer_id,\n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/nation.parquet` c\n" +
+ " Intersect ALL \n" +
+ " SELECT c.r_regionkey customer_id, \n" +
+ " 1 year_total\n" +
+ " FROM cp.`tpch/region.parquet` c) \n" +
+ " SELECT count(t_w_firstyear.customer_id) as ct \n" +
+ " FROM year_total_1 t_w_firstyear,\n" +
+ " year_total_2 t_w_secyear\n" +
+ " WHERE t_w_firstyear.year_total = t_w_secyear.year_total\n" +
+ " AND t_w_firstyear.year_total > 0 and t_w_secyear.year_total > 0";
+
+ testBuilder()
+ .sqlQuery(query1)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 5)
+ .build()
+ .run();
+
+ testBuilder()
+ .sqlQuery(query2)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 25)
+ .build()
+ .run();
+
+ testBuilder()
+ .sqlQuery(query3)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 125)
+ .build()
+ .run();
+
+ testBuilder()
+ .sqlQuery(query4)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 25)
+ .build()
+ .run();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testFragmentNum() throws Exception {
+ final String l = "/multilevel/parquet/1994";
+ final String r = "/multilevel/parquet/1995";
+
+ final String query = String.format("SELECT o_custkey FROM dfs.`%s` \n" +
+ "Except All SELECT o_custkey FROM dfs.`%s`", l, r);
+
+ // Validate the plan
+ final String[] expectedPlan = {"UnionExchange.*\n",
+ ".*SetOp"};
+
+ try {
+ client.alterSession(ExecConstants.SLICE_TARGET, 1);
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlan)
+ .match();
+
+ testBuilder()
+ .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ client.resetSession(ExecConstants.SLICE_TARGET);
+ }
+ }
+
+ @Test
+ public void testGroupByOnSetOp() throws Exception {
+ final String l = "/multilevel/parquet/1994";
+ final String r = "/multilevel/parquet/1995";
+
+ final String query = String.format("Select o_custkey, count(*) as cnt from \n" +
+ " (SELECT o_custkey FROM dfs.`%s` \n" +
+ "Intersect All SELECT o_custkey FROM dfs.`%s`) \n" +
+ "group by o_custkey", l, r);
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)UnionExchange.*StreamAgg.*Sort.*SetOp.*"};
+
+ try {
+ client.alterSession(ExecConstants.SLICE_TARGET, 1);
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlan)
+ .match();
+
+ testBuilder()
+ .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ client.resetSession(ExecConstants.SLICE_TARGET);
+ }
+ }
+
+ @Test
+ public void testSetOpOnHashJoin() throws Exception {
+ final String l = "/multilevel/parquet/1994";
+ final String r = "/multilevel/parquet/1995";
+
+ final String query = String.format("SELECT o_custkey FROM \n" +
+ " (select o1.o_custkey from dfs.`%s` o1 inner join dfs.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
+ " Intersect All SELECT o_custkey FROM dfs.`%s` where o_custkey > 10", l, r, l);
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)UnionExchange.*SetOp.*HashJoin.*"};
+
+ try {
+ client.alterSession(ExecConstants.SLICE_TARGET, 1);
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlan)
+ .match();
+
+ testBuilder()
+ .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ client.resetSession(ExecConstants.SLICE_TARGET);
+ }
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testLimitOneOnRightSide() throws Exception {
+ final String l = "/multilevel/parquet/1994";
+ final String r = "/multilevel/parquet/1995";
+
+ final String query = String.format("SELECT o_custkey FROM \n" +
+ " ((select o1.o_custkey from dfs.`%s` o1 inner join dfs.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
+ " Intersect All (SELECT o_custkey FROM dfs.`%s` limit 1))", l, r, l);
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)UnionExchange.*SetOp.*HashJoin.*"};
+
+ try {
+ client.alterSession(ExecConstants.SLICE_TARGET, 1);
+ client.alterSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY, true);
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlan)
+ .match();
+
+ testBuilder()
+ .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ client.resetSession(ExecConstants.SLICE_TARGET);
+ client.resetSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY);
+ }
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testLimitOneOnLeftSide() throws Exception {
+ final String l = "/multilevel/parquet/1994";
+ final String r = "/multilevel/parquet/1995";
+
+ final String query = String.format("SELECT o_custkey FROM \n" +
+ " ((SELECT o_custkey FROM dfs.`%s` limit 1) \n" +
+ " intersect all \n" +
+ " (select o1.o_custkey from dfs.`%s` o1 inner join dfs.`%s` o2 on o1.o_orderkey = o2.o_custkey))", l, r, l);
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)SetOp.*BroadcastExchange.*HashJoin.*"};
+
+ try {
+ client.alterSession(ExecConstants.SLICE_TARGET, 1);
+ client.alterSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY, true);
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlan)
+ .match();
+
+ testBuilder()
+ .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ client.resetSession(ExecConstants.SLICE_TARGET);
+ client.resetSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY);
+ }
+ }
+
+ @Test
+ public void testIntersectAllWithValues() throws Exception {
+ testBuilder()
+ .sqlQuery("values('A') intersect all values('A')")
+ .unOrdered()
+ .baselineColumns("EXPR$0")
+ .baselineValues("A")
+ .go();
+ }
+
+ @Test
+ @Category(UnlikelyTest.class)
+ public void testFieldWithDots() throws Exception {
+ String fileName = "table.json";
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(dirTestWatcher.getRootDir(), fileName)))) {
+ writer.write("{\"rk.q\": \"a\", \"m\": {\"a.b\":\"1\", \"a\":{\"b\":\"2\"}, \"c\":\"3\"}}");
+ }
+
+ testBuilder()
+ .sqlQuery("select * from (" +
+ "(select t.m.`a.b` as a,\n" +
+ "t.m.a.b as b,\n" +
+ "t.m['a.b'] as c,\n" +
+ "t.`rk.q` as e\n" +
+ "from dfs.`%1$s` t)\n" +
+ "intersect all\n" +
+ "(select t.m.`a.b` as a,\n" +
+ "t.m.a.b as b,\n" +
+ "t.m['a.b'] as c,\n" +
+ "t.`rk.q` as e\n" +
+ "from dfs.`%1$s` t))", fileName)
+ .unOrdered()
+ .baselineColumns("a", "b", "c", "e")
+ .baselineValues("1", "2", "1", "a")
+ .go();
+ }
+
+ @Test
+ public void testExceptAllRightEmptyDir() throws Exception {
+ String rootSimple = "/store/json/booleanData.json";
+
+ testBuilder()
+ .sqlQuery("SELECT key FROM cp.`%s` EXCEPT ALL SELECT key FROM dfs.tmp_default_format.`%s`",
+ rootSimple, EMPTY_DIR_NAME)
+ .unOrdered()
+ .baselineColumns("key")
+ .baselineValues(true)
+ .baselineValues(false)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testExceptAllLeftEmptyDir() throws Exception {
+ final String rootSimple = "/store/json/booleanData.json";
+
+ testBuilder()
+ .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%s` EXCEPT ALL SELECT key FROM cp.`%s`",
+ EMPTY_DIR_NAME, rootSimple)
+ .unOrdered()
+ .baselineColumns("key")
+ .expectsEmptyResultSet()
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testIntersectBothEmptyDirs() throws Exception {
+ SchemaBuilder schemaBuilder = new SchemaBuilder()
+ .addNullable("key", TypeProtos.MinorType.INT);
+ BatchSchema expectedSchema = new BatchSchemaBuilder()
+ .withSchemaBuilder(schemaBuilder)
+ .build();
+
+ testBuilder()
+ .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%1$s` INTERSECT ALL SELECT key FROM dfs.tmp_default_format.`%1$s`", EMPTY_DIR_NAME)
+ .schemaBaseLine(expectedSchema)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testSetOpMiddleEmptyDir() throws Exception {
+ final String query = "(SELECT n_regionkey FROM cp.`tpch/nation.parquet` EXCEPT ALL " +
+ "SELECT missing_key FROM dfs.tmp_default_format.`%s`) intersect all SELECT r_regionkey FROM cp.`tpch/region.parquet`";
+
+ testBuilder()
+ .sqlQuery(query, EMPTY_DIR_NAME)
+ .unOrdered()
+ .baselineColumns("n_regionkey")
+ .baselineValues(0)
+ .baselineValues(1)
+ .baselineValues(2)
+ .baselineValues(3)
+ .baselineValues(4)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testComplexQueryWithSetOpAndEmptyDir() throws Exception {
+ final String rootSimple = "/store/json/booleanData.json";
+
+ testBuilder()
+ .sqlQuery("SELECT key FROM cp.`%2$s` INTERSECT ALL SELECT key FROM " +
+ "(SELECT key FROM cp.`%2$s` EXCEPT ALL SELECT key FROM dfs.tmp_default_format.`%1$s`)",
+ EMPTY_DIR_NAME, rootSimple)
+ .unOrdered()
+ .baselineColumns("key")
+ .baselineValues(true)
+ .baselineValues(false)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testIntersectCancellation() throws Exception {
+ String query = "WITH foo AS\n" +
+ " (SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+ " Intersect ALL\n" +
+ " SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+ " WHERE n_nationkey > (SELECT 1) )\n" +
+ "SELECT * FROM foo\n" +
+ "LIMIT 1";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a")
+ .baselineValues(1)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testMultiBatch() throws Exception {
+ String query = "(select * from (values(1,1)) t(a,b) union all select * from (values(3,3)) t(a,b) union all select * from (values(5,5)) t(a,b)) intersect all " +
+ "(select * from (values(1,1)) t(a,b) union all select * from (values(3,3), (2,2)) t(a,b) union all select * from (values(6,6), (4,4), (5,5)) t(a,b)) ";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues(5, 5)
+ .baselineValues(3, 3)
+ .baselineValues(1, 1)
+ .build().run();
+ }
+
+ @Test
+ public void testFirstEmptyBatch() throws Exception {
+ String query = "(select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 0 union all select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 5) intersect all " +
+ "(select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 0 union all select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 3)";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_nationkey")
+ .baselineValues(0)
+ .baselineValues(2)
+ .baselineValues(1)
+ .build().run();
+ }
+
+ @Test
+ public void testUnsupportedComplexType() {
+ try {
+ String query = "select sia from cp.`complex/json/complex.json` intersect all select sia from cp.`complex/json/complex.json`";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("sia")
+ .baselineValues("[1,11,101,1001]")
+ .baselineValues("[2,12,102,1002]")
+ .baselineValues("[3,13,103,1003]")
+ .build().run();
+ Assert.fail("Missing expected exception on complex type");
+ } catch (Exception ex) {
+ Assert.assertThat(ex.getMessage(), ex.getMessage(),
+ CoreMatchers.containsString("Map, Array, Union or repeated scalar type should not be used in group by, order by or in a comparison operator"));
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index ec0471e255..996c5c4008 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -829,12 +829,17 @@ public class QueryBuilder {
* only if expected pattern was not matched or unexpected pattern was matched.
*/
public void match() {
- included.forEach(pattern -> match(pattern, true));
- excluded.forEach(pattern -> match(pattern, false));
+ included.forEach(pattern -> match(pattern, true, false));
+ excluded.forEach(pattern -> match(pattern, false, false));
}
- private void match(String patternString, boolean expectedResult) {
- Pattern pattern = Pattern.compile(patternString);
+ public void match(boolean matchMultiLine) {
+ included.forEach(pattern -> match(pattern, true, matchMultiLine));
+ excluded.forEach(pattern -> match(pattern, false, matchMultiLine));
+ }
+
+ private void match(String patternString, boolean expectedResult, boolean matchMultiLine) {
+ Pattern pattern = Pattern.compile(patternString, (matchMultiLine ? Pattern.DOTALL : 0));
Matcher matcher = pattern.matcher(plan);
String message = String.format("%s in plan: %s\n%s",
expectedResult ? EXPECTED_NOT_FOUND : UNEXPECTED_FOUND, patternString, plan);
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/Except.java b/logical/src/main/java/org/apache/drill/common/logical/data/Except.java
new file mode 100644
index 0000000000..b060a16054
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Except.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("Except")
+public class Except extends LogicalOperatorBase {
+ private final List<LogicalOperator> inputs;
+ private final boolean distinct;
+
+ @JsonCreator
+ public Except(@JsonProperty("inputs") List<LogicalOperator> inputs, @JsonProperty("distinct") Boolean distinct){
+ this.inputs = inputs;
+ for (LogicalOperator o : inputs) {
+ o.registerAsSubscriber(this);
+ }
+ this.distinct = distinct == null ? false : distinct;
+ }
+
+ public List<LogicalOperator> getInputs() {
+ return inputs;
+ }
+
+ public boolean isDistinct() {
+ return distinct;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitExcept(this, value);
+ }
+
+ @Override
+ public Iterator<LogicalOperator> iterator() {
+ return inputs.iterator();
+ }
+
+
+ public static Builder builder(){
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractBuilder<Except>{
+ private List<LogicalOperator> inputs = Lists.newArrayList();
+ private boolean distinct;
+
+ public Builder addInput(LogicalOperator o){
+ inputs.add(o);
+ return this;
+ }
+
+ public Builder setDistinct(boolean distinct){
+ this.distinct = distinct;
+ return this;
+ }
+
+ @Override
+ public Except build() {
+ return new Except(inputs, distinct);
+ }
+
+ }
+
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/Intersect.java b/logical/src/main/java/org/apache/drill/common/logical/data/Intersect.java
new file mode 100644
index 0000000000..ec0a1df70e
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Intersect.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("intersect")
+public class Intersect extends LogicalOperatorBase {
+ private final List<LogicalOperator> inputs;
+ private final boolean distinct;
+
+ @JsonCreator
+ public Intersect(@JsonProperty("inputs") List<LogicalOperator> inputs, @JsonProperty("distinct") Boolean distinct){
+ this.inputs = inputs;
+ for (LogicalOperator o : inputs) {
+ o.registerAsSubscriber(this);
+ }
+ this.distinct = distinct == null ? false : distinct;
+ }
+
+ public List<LogicalOperator> getInputs() {
+ return inputs;
+ }
+
+ public boolean isDistinct() {
+ return distinct;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitIntersect(this, value);
+ }
+
+ @Override
+ public Iterator<LogicalOperator> iterator() {
+ return inputs.iterator();
+ }
+
+
+ public static Builder builder(){
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractBuilder<Intersect>{
+ private List<LogicalOperator> inputs = Lists.newArrayList();
+ private boolean distinct;
+
+ public Builder addInput(LogicalOperator o){
+ inputs.add(o);
+ return this;
+ }
+
+ public Builder setDistinct(boolean distinct){
+ this.distinct = distinct;
+ return this;
+ }
+
+ @Override
+ public Intersect build() {
+ return new Intersect(inputs, distinct);
+ }
+
+ }
+
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
index c46dc43335..2f63b3f2f4 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.common.logical.data.visitors;
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.Intersect;
import org.apache.drill.common.logical.data.LateralJoin;
import org.apache.drill.common.logical.data.Unnest;
import org.apache.drill.common.logical.data.Analyze;
@@ -111,6 +113,16 @@ public abstract class AbstractLogicalVisitor<T, X, E extends Throwable> implemen
return visitOp(union, value);
}
+ @Override
+ public T visitExcept(Except except, X value) throws E {
+ return visitOp(except, value);
+ }
+
+ @Override
+ public T visitIntersect(Intersect intersect, X value) throws E {
+ return visitOp(intersect, value);
+ }
+
@Override
public T visitWindow(Window window, X value) throws E {
return visitOp(window, value);
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
index ee9036c6e7..fee1595688 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
@@ -18,6 +18,8 @@
package org.apache.drill.common.logical.data.visitors;
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.Intersect;
import org.apache.drill.common.logical.data.LateralJoin;
import org.apache.drill.common.logical.data.Unnest;
import org.apache.drill.common.logical.data.Analyze;
@@ -62,6 +64,8 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitRunningAggregate(RunningAggregate runningAggregate, EXTRA value) throws EXCEP;
public RETURN visitTransform(Transform transform, EXTRA value) throws EXCEP;
public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
+ public RETURN visitExcept(Except except, EXTRA value) throws EXCEP;
+ public RETURN visitIntersect(Intersect intersect, EXTRA value) throws EXCEP;
public RETURN visitWindow(Window window, EXTRA value) throws EXCEP;
public RETURN visitWriter(Writer writer, EXTRA value) throws EXCEP;