You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2023/02/12 20:04:32 UTC

[drill] branch master updated: DRILL-4232: Support for EXCEPT and INTERSECT set operator (#2599)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new c38091fdd6 DRILL-4232: Support for EXCEPT and INTERSECT set operator (#2599)
c38091fdd6 is described below

commit c38091fdd66a222657b3c617aaab834d1594570d
Author: leon <32...@qq.com>
AuthorDate: Mon Feb 13 04:04:25 2023 +0800

    DRILL-4232: Support for EXCEPT and INTERSECT set operator (#2599)
---
 .../java/org/apache/drill/exec/ExecConstants.java  |    4 +
 .../physical/base/AbstractPhysicalVisitor.java     |    6 +
 .../drill/exec/physical/base/PhysicalVisitor.java  |    2 +
 .../apache/drill/exec/physical/config/SetOp.java   |   69 +
 .../exec/physical/impl/common/HashPartition.java   |   12 +
 .../drill/exec/physical/impl/common/HashTable.java |   20 +
 .../exec/physical/impl/common/HashTableConfig.java |   18 +-
 .../physical/impl/common/HashTableTemplate.java    |   60 +-
 ...tch.java => AbstractHashBinaryRecordBatch.java} |  396 ++---
 .../exec/physical/impl/join/HashJoinBatch.java     | 1590 +-------------------
 .../physical/impl/join/HashJoinProbeTemplate.java  |  460 +-----
 .../impl/join/{HashJoinProbe.java => Probe.java}   |   29 +-
 ...shJoinProbeTemplate.java => ProbeTemplate.java} |  237 +--
 .../impl/setop/HashSetOpProbeTemplate.java         |   99 ++
 .../physical/impl/setop/HashSetOpRecordBatch.java  |  139 ++
 .../physical/impl/setop/SetOpBatchCreator.java     |   36 +
 .../apache/drill/exec/planner/PlannerPhase.java    |    6 +
 .../drill/exec/planner/common/DrillSetOpRel.java   |   35 +
 .../exec/planner/common/DrillUnionRelBase.java     |   58 -
 .../drill/exec/planner/logical/DrillExceptRel.java |   98 ++
 .../{DrillUnionRel.java => DrillIntersectRel.java} |   64 +-
 .../exec/planner/logical/DrillRelFactories.java    |   23 +
 .../drill/exec/planner/logical/DrillSetOpRule.java |   64 +
 .../exec/planner/logical/DrillUnionAllRule.java    |    2 +-
 .../drill/exec/planner/logical/DrillUnionRel.java  |   19 +-
 .../exec/planner/logical/PreProcessLogicalRel.java |   31 +
 .../exec/planner/logical/ScanFieldDeterminer.java  |   18 +
 .../physical/{UnionAllPrel.java => SetOpPrel.java} |   77 +-
 .../drill/exec/planner/physical/SetOpPrule.java    |  242 +++
 .../drill/exec/planner/physical/UnionAllPrel.java  |    9 +-
 .../drill/exec/planner/physical/UnionAllPrule.java |    3 +-
 .../exec/planner/physical/UnionDistinctPrel.java   |    9 +-
 .../exec/planner/physical/UnionDistinctPrule.java  |    3 +-
 .../drill/exec/planner/physical/UnionPrel.java     |   20 +-
 .../physical/visitor/FinalColumnReorderer.java     |   11 +-
 .../planner/sql/handlers/FindLimit0Visitor.java    |    6 +-
 .../sql/parser/UnsupportedOperatorsVisitor.java    |   10 +-
 .../exec/planner/torel/ConversionContext.java      |   14 +
 .../exec/server/options/SystemOptionManager.java   |    1 +
 .../drill/exec/store/plan/rel/PluginUnionRel.java  |   10 +-
 .../java-exec/src/main/resources/drill-module.conf |    1 +
 .../apache/drill/TestDisabledFunctionality.java    |   36 -
 .../src/test/java/org/apache/drill/TestSetOp.java  | 1182 +++++++++++++++
 .../java/org/apache/drill/test/QueryBuilder.java   |   13 +-
 .../apache/drill/common/logical/data/Except.java   |   87 ++
 .../drill/common/logical/data/Intersect.java       |   87 ++
 .../data/visitors/AbstractLogicalVisitor.java      |   12 +
 .../logical/data/visitors/LogicalVisitor.java      |    4 +
 48 files changed, 2837 insertions(+), 2595 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 696aa804a5..73aac06216 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -600,6 +600,10 @@ public final class ExecConstants {
   public static final BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator(ENABLE_UNION_TYPE_KEY,
       new OptionDescription("Enable support for Avro union type."));
 
+  public static final String EXCEPT_ADD_AGG_BELOW_KEY = "exec.except_add_agg_below";
+  public static final BooleanValidator EXCEPT_ADD_AGG_BELOW = new BooleanValidator(EXCEPT_ADD_AGG_BELOW_KEY,
+    new OptionDescription("Add agg below setop for left input when doing except, otherwise above setop"));
+
   // Kafka plugin related options.
   public static final String KAFKA_ALL_TEXT_MODE = "store.kafka.all_text_mode";
   public static final OptionValidator KAFKA_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(KAFKA_ALL_TEXT_MODE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index ea2221abaf..e58f5ea922 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.RangePartitionSender;
 import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
 import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SetOp;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StatisticsAggregate;
@@ -59,6 +60,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
     return visitOp(union, value);
   }
 
+  @Override
+  public T visitSetOp(SetOp setOp, X value) throws E {
+    return visitOp(setOp, value);
+  }
+
   @Override
   public T visitWriter(Writer writer, X value) throws E {
     return visitOp(writer, value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index d1b1479bb7..dee1203ac3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.RangePartitionSender;
 import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
 import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SetOp;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StatisticsAggregate;
@@ -63,6 +64,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
   public RETURN visitUnion(UnionAll union, EXTRA value) throws EXCEP;
+  public RETURN visitSetOp(SetOp setOp, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SetOp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SetOp.java
new file mode 100644
index 0000000000..27f77df28f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SetOp.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.AbstractMultiple;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+
+import java.util.List;
+
+@JsonTypeName("set-op")
+public class SetOp extends AbstractMultiple {
+  public SqlKind kind;
+  public boolean all;
+
+  @JsonCreator
+  public SetOp(@JsonProperty("children") List<PhysicalOperator> children, @JsonProperty("sqlKind") SqlKind kind, @JsonProperty("all") boolean all) {
+    super(children);
+    this.kind = kind;
+    this.all = all;
+  }
+
+  public SqlKind getKind() {
+    return kind;
+  }
+
+  public boolean isAll() {
+    return all;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSetOp(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new SetOp(children, kind, all);
+  }
+
+  @Override
+  public String getOperatorType() {
+    return kind.name() + (all ? "_ALL" : "");
+  }
+
+  @Override @JsonIgnore
+  public boolean isBufferedOperator(QueryContext queryContext) { return true; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 52fe0a8d59..2b21831f34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -386,6 +386,18 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
     return hashTable.probeForKey(recordsProcessed, hashCode);
   }
 
+  public int getRecordNumForKey(int currentIndex) {
+    return hashTable.getRecordNumForKey(currentIndex);
+  }
+
+  public void setRecordNumForKey(int currentIndex, int num) {
+    hashTable.setRecordNumForKey(currentIndex, num);
+  }
+
+  public void decreaseRecordNumForKey(int currentIndex) {
+    hashTable.decreaseRecordNumForKey(currentIndex);
+  }
+
   public Pair<Integer, Boolean> getStartIndex(int probeIndex) {
     /* The current probe record has a key that matches. Get the index
      * of the first row in the build side that matches the current key
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 1d9e2679ef..cdc4febbf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -98,6 +98,26 @@ public interface HashTable {
    */
   int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException;
 
+  /**
+   * @param currentIndex The composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+   * @return Returns -1 if the count of records for a specific key is not computed. Otherwise returns
+   * the count of records for a specific key.
+   */
+  int getRecordNumForKey(int currentIndex);
+
+  /**
+   * Set the count of records for a specific key to num.
+   * @param currentIndex The composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+   * @param num The count of records for a specific key to be set.
+   */
+  void setRecordNumForKey(int currentIndex, int num);
+
+  /**
+   * Decrease the count of records for a specific key by one.
+   * @param currentIndex The composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+   */
+  void decreaseRecordNumForKey(int currentIndex);
+
   void getStats(HashTableStats stats);
 
   int size();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index fe329cc5e5..aaa2c5fcfb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -36,6 +36,7 @@ public class HashTableConfig  {
   private final List<NamedExpression> keyExprsProbe;
   private final List<Comparator> comparators;
   private final int joinControl;
+  private final boolean computeKeyNum;
 
   public HashTableConfig(
       int initialCapacity,
@@ -44,7 +45,7 @@ public class HashTableConfig  {
       List<NamedExpression> keyExprsProbe,
       List<Comparator> comparators
   ) {
-    this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT);
+    this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT, false);
   }
 
   @JsonCreator
@@ -54,7 +55,7 @@ public class HashTableConfig  {
                          @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
                          @JsonProperty("comparators") List<Comparator> comparators,
                          @JsonProperty("joinControl") int joinControl) {
-    this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, joinControl);
+    this(initialCapacity, false, loadFactor, keyExprsBuild, keyExprsProbe, comparators, joinControl, false);
   }
 
   public HashTableConfig(
@@ -65,7 +66,7 @@ public class HashTableConfig  {
       List<NamedExpression> keyExprsProbe,
       List<Comparator> comparators
   ) {
-    this(initialCapacity, initialSizeIsFinal, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT);
+    this(initialCapacity, initialSizeIsFinal, loadFactor, keyExprsBuild, keyExprsProbe, comparators, JoinControl.DEFAULT, false);
   }
 
   @JsonCreator
@@ -75,7 +76,8 @@ public class HashTableConfig  {
                          @JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
                          @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
                          @JsonProperty("comparators") List<Comparator> comparators,
-                         @JsonProperty("joinControl") int joinControl
+                         @JsonProperty("joinControl") int joinControl,
+                         @JsonProperty("computeKeyNum") boolean computeKeyNum
   ) {
     this.initialCapacity = initialCapacity;
     this.initialSizeIsFinal = initialSizeIsFinal;
@@ -84,6 +86,7 @@ public class HashTableConfig  {
     this.keyExprsProbe = keyExprsProbe;
     this.comparators = comparators;
     this.joinControl = joinControl;
+    this.computeKeyNum = computeKeyNum;
   }
 
   public HashTableConfig withInitialCapacity(int initialCapacity) {
@@ -93,7 +96,8 @@ public class HashTableConfig  {
       keyExprsBuild,
       keyExprsProbe,
       comparators,
-      JoinControl.DEFAULT
+      JoinControl.DEFAULT,
+      computeKeyNum
     );
   }
 
@@ -124,4 +128,8 @@ public class HashTableConfig  {
   public int getJoinControl() {
     return joinControl;
   }
+
+  public boolean isComputeKeyNum() {
+    return computeKeyNum;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 7d9e596148..da6e5e0861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -132,7 +132,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     // Array of 'link' values
     private IntVector links;
-
+    private IntVector nums = null;
     // Array of hash values - this is useful when resizing the hash table
     private IntVector hashValues;
 
@@ -178,6 +178,9 @@ public abstract class HashTableTemplate implements HashTable {
 
         links = allocMetadataVector(newBatchHolderSize, EMPTY_SLOT);
         hashValues = allocMetadataVector(newBatchHolderSize, 0);
+        if (htConfig.isComputeKeyNum()) {
+          nums = allocMetadataVector(newBatchHolderSize, 0);
+        }
         success = true;
       } finally {
         if (!success) {
@@ -185,6 +188,9 @@ public abstract class HashTableTemplate implements HashTable {
           if (links != null) {
             links.clear();
           }
+          if (nums != null) {
+            nums.clear();
+          }
         }
       }
     }
@@ -199,6 +205,12 @@ public abstract class HashTableTemplate implements HashTable {
       }
       links.getMutator().setValueCount(size);
       hashValues.getMutator().setValueCount(size);
+      if (nums != null) {
+        for (int i = 0; i < size; i++) {
+          nums.getMutator().set(i, 0);
+        }
+        nums.getMutator().setValueCount(size);
+      }
     }
 
     protected void setup() throws SchemaChangeException {
@@ -242,6 +254,32 @@ public abstract class HashTableTemplate implements HashTable {
       return links.getAccessor().get(currentIndex & BATCH_MASK);
     }
 
+    private void increaseRecordNumForKey(int idxWithinBatch) {
+      if (nums != null) {
+        nums.getMutator().set(idxWithinBatch, nums.getAccessor().get(idxWithinBatch) + 1);
+      }
+    }
+
+    private void decreaseRecordNumForKey(int idxWithinBatch) {
+      if (nums != null) {
+        nums.getMutator().set(idxWithinBatch, nums.getAccessor().get(idxWithinBatch) - 1);
+      }
+    }
+
+    private int getRecordNumForKey(int idxWithinBatch) {
+      if (nums != null) {
+        return nums.getAccessor().get(idxWithinBatch);
+      } else {
+        return -1;
+      }
+    }
+
+    private void setNum(int idxWithinBatch, int num) {
+      if (nums != null) {
+        nums.getMutator().set(idxWithinBatch, num);
+      }
+    }
+
     // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
     // container at the specified index
     private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException {
@@ -259,6 +297,9 @@ public abstract class HashTableTemplate implements HashTable {
       // will point to a null (empty) slot
       links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT);
       hashValues.getMutator().set(currentIdxWithinBatch, hashValue);
+      if (nums != null) {
+        nums.getMutator().set(currentIdxWithinBatch, 1);
+      }
 
       maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
 
@@ -400,6 +441,7 @@ public abstract class HashTableTemplate implements HashTable {
       htContainer.clear();
       if ( links != null ) { links.clear(); }
       if ( hashValues != null ) { hashValues.clear(); }
+      if ( nums != null ) { nums.clear(); }
     }
 
     // Only used for internal debugging. Get the value vector at a particular index from the htContainer.
@@ -686,6 +728,7 @@ public abstract class HashTableTemplate implements HashTable {
 
       if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIndex, false)) {
         htIdxHolder.value = currentIndex;
+        lastEntryBatch.increaseRecordNumForKey(lastEntryIdxWithinBatch);
         return PutStatus.KEY_PRESENT;
       }
     }
@@ -767,6 +810,21 @@ public abstract class HashTableTemplate implements HashTable {
     return -1;
   }
 
+  public int getRecordNumForKey(int currentIndex) {
+    BatchHolder bh = batchHolders.get((currentIndex >>> 16) & BATCH_MASK);
+    return bh.getRecordNumForKey(currentIndex & BATCH_MASK);
+  }
+
+  public void setRecordNumForKey(int currentIndex, int num) {
+    BatchHolder bh = batchHolders.get((currentIndex >>> 16) & BATCH_MASK);
+    bh.setNum(currentIndex & BATCH_MASK, num);
+  }
+
+  public void decreaseRecordNumForKey(int currentIndex) {
+    BatchHolder bh = batchHolders.get((currentIndex >>> 16) & BATCH_MASK);
+    bh.decreaseRecordNumForKey(currentIndex & BATCH_MASK);
+  }
+
   // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
   // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
   // the capacity, we will add a new BatchHolder. Return true if a new batch was added.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
similarity index 81%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
index a08e4308cd..bf9e3eee8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
@@ -17,31 +17,15 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -58,18 +42,16 @@ import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
 import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
-import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.common.HashPartition;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
-import org.apache.drill.exec.planner.common.JoinControl;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.JoinBatchMemoryManager;
@@ -77,7 +59,6 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
@@ -89,26 +70,37 @@ import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 /**
- * Implements the runtime execution for the Hash-Join operator supporting INNER,
- * LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
+ * Base class for the runtime execution implementation of the Hash-Join and
+ * Hash-SetOp operator
  * <p>
  * This implementation splits the incoming Build side rows into multiple
  * Partitions, thus allowing spilling of some of these partitions to disk if
  * memory gets tight. Each partition is implemented as a {@link HashPartition}.
- * After the build phase is over, in the most general case, some of the
- * partitions were spilled, and the others are in memory. Each of the partitions
+ * After the build phase is over, in the most general case, some partitions
+ * were spilled, and the others are in memory. Each of the partitions
  * in memory would get a {@link HashTable} built.
  * <p>
  * Next the Probe side is read, and each row is key matched with a Build
  * partition. If that partition is in memory, then the key is used to probe and
- * perform the join, and the results are added to the outgoing batch. But if
+ * perform the operation, and the results are added to the outgoing batch. But if
  * that build side partition was spilled, then the matching Probe size partition
  * is spilled as well.
  * <p>
@@ -125,39 +117,35 @@ import org.slf4j.LoggerFactory;
  * greater) is a waste, indicating that the number of partitions chosen was too
  * small.
  */
-public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
-    implements RowKeyJoin {
-  private static final Logger logger = LoggerFactory
-      .getLogger(HashJoinBatch.class);
+public abstract class AbstractHashBinaryRecordBatch<T extends PhysicalOperator> extends AbstractBinaryRecordBatch<T> {
+  protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+  protected boolean semiJoin;
+  protected boolean joinIsLeftOrFull;
+  protected boolean joinIsRightOrFull;
+  // Whether this HashJoin is used for a row-key based join
+  protected boolean isRowKeyJoin;
+  protected boolean enableRuntimeFilter;
+  protected RuntimeFilterDef runtimeFilterDef = null;
+  protected List<NamedExpression> rightExpr = Lists.newArrayList();
 
   /**
-   * The maximum number of records within each internal batch.
+   * Names of the join columns. This names are used in order to help estimate
+   * the size of the {@link HashTable}s.
    */
-  private final int RECORDS_PER_BATCH; // internal batches
-
-  // Join type, INNER, LEFT, RIGHT or OUTER
-  private final JoinRelType joinType;
-  private final boolean semiJoin;
-  private final boolean joinIsLeftOrFull;
-  private final boolean joinIsRightOrFull;
-  private boolean skipHashTableBuild; // when outer side is empty, and the join
-                                      // is inner or left (see DRILL-6755)
-
-  // Join conditions
-  private final List<JoinCondition> conditions;
-
-  private RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
+  protected Set<String> buildJoinColumns = Sets.newHashSet();
 
-  // Runtime generated class implementing HashJoinProbe interface
-  private HashJoinProbe hashJoinProbe = null;
-
-  private final List<NamedExpression> rightExpr;
+  protected boolean skipHashTableBuild; // when outer side is empty, and the join
+                                      // is inner or left (see DRILL-6755)
 
   /**
-   * Names of the join columns. This names are used in order to help estimate
-   * the size of the {@link HashTable}s.
+   * The maximum number of records within each internal batch.
    */
-  private final Set<String> buildJoinColumns;
+  protected final int RECORDS_PER_BATCH; // internal batches
+  protected RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
+
+  // Runtime generated class implementing Probe interface
+  protected Probe probe = null;
 
   // Fields used for partitioning
   /**
@@ -165,87 +153,81 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
    * option and set in
    * {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
-  private int numPartitions = 1; // must be 2 to the power of bitsInMask
+  protected int numPartitions = 1; // must be 2 to the power of bitsInMask
 
   /**
    * The master class used to generate {@link HashTable}s.
    */
-  private ChainedHashTable baseHashTable;
-  private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
-  private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
-  private boolean canSpill = true;
-  private boolean wasKilled; // a kill was received, may need to clean spilled
+  protected ChainedHashTable baseHashTable;
+  protected final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
+  protected final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
+  protected boolean canSpill = true;
+  protected boolean wasKilled; // a kill was received, may need to clean spilled
                              // partns
 
   /**
    * This array holds the currently active {@link HashPartition}s.
    */
-  HashPartition partitions[];
+  protected HashPartition[] partitions;
 
   // Number of records in the output container
-  private int outputRecords;
+  protected int outputRecords;
 
   // Schema of the build side
-  private BatchSchema buildSchema;
+  protected BatchSchema buildSchema;
   // Schema of the probe side
-  private BatchSchema probeSchema;
-
-  // Whether this HashJoin is used for a row-key based join
-  private final boolean isRowKeyJoin;
-
-  private final JoinControl joinControl;
+  protected BatchSchema probeSchema;
 
   // An iterator over the build side hash table (only applicable for row-key
   // joins)
-  private boolean buildComplete;
+  protected boolean buildComplete;
 
   // indicates if we have previously returned an output batch
-  private boolean firstOutputBatch = true;
+  protected boolean firstOutputBatch = true;
 
-  private int rightHVColPosition;
-  private final BufferAllocator allocator;
+  protected int rightHVColPosition;
+  protected final BufferAllocator allocator;
   // Local fields for left/right incoming - may be replaced when reading from
   // spilled
-  private RecordBatch buildBatch;
-  private RecordBatch probeBatch;
+  protected RecordBatch buildBatch;
+  protected RecordBatch probeBatch;
 
   /**
    * Flag indicating whether or not the first data holding build batch needs to
    * be fetched.
    */
-  private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
+  protected final MutableBoolean prefetchedBuild = new MutableBoolean(false);
   /**
    * Flag indicating whether or not the first data holding probe batch needs to
    * be fetched.
    */
-  private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
+  protected final MutableBoolean prefetchedProbe = new MutableBoolean(false);
 
   // For handling spilling
-  private final SpillSet spillSet;
-  HashJoinPOP popConfig;
+  protected final SpillSet spillSet;
+  T popConfig;
 
-  private final int originalPartition = -1; // the partition a secondary reads
+  protected final int originalPartition = -1; // the partition a secondary reads
                                             // from
   IntVector read_right_HV_vector; // HV vector that was read from the spilled
                                   // batch
-  private final int maxBatchesInMemory;
-  private final List<String> probeFields = new ArrayList<>(); // keep the same
+  protected final int maxBatchesInMemory;
+  protected final List<String> probeFields = new ArrayList<>(); // keep the same
                                                               // sequence with
                                                               // the
                                                               // bloomFilters
-  private boolean enableRuntimeFilter;
-  private RuntimeFilterReporter runtimeFilterReporter;
-  private ValueVectorHashHelper.Hash64 hash64;
-  private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
-  private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
-  private final List<BloomFilter> bloomFilters = new ArrayList<>();
-  private boolean bloomFiltersGenerated;
+  protected RuntimeFilterReporter runtimeFilterReporter;
+  protected ValueVectorHashHelper.Hash64 hash64;
+  protected final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+  protected final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+  protected final List<BloomFilter> bloomFilters = new ArrayList<>();
+  protected boolean bloomFiltersGenerated;
 
   /**
    * This holds information about the spilled partitions for the build and probe
    * side.
    */
-  public static class HashJoinSpilledPartition
+  public static class SpilledPartition
       extends AbstractSpilledPartitionMetadata {
     private final int innerSpilledBatches;
     private final String innerSpillFile;
@@ -253,7 +235,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     private String outerSpillFile;
     private boolean updatedOuter;
 
-    public HashJoinSpilledPartition(int cycle, int originPartition,
+    public SpilledPartition(int cycle, int originPartition,
         int prevOriginPartition, int innerSpilledBatches,
         String innerSpillFile) {
       super(cycle, originPartition, prevOriginPartition);
@@ -297,20 +279,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     }
   }
 
-  public class HashJoinUpdater implements SpilledState.Updater {
+  public class Updater implements SpilledState.Updater {
     @Override
     public void cleanup() {
-      HashJoinBatch.this.cleanup();
+      AbstractHashBinaryRecordBatch.this.cleanup();
     }
 
     @Override
     public String getFailureMessage() {
-      return "Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates).";
+      return this.getClass().getName() + " can not partition the inner data any further (probably due to too many join-key duplicates).";
     }
 
     @Override
     public long getMemLimit() {
-      return HashJoinBatch.this.allocator.getLimit();
+      return AbstractHashBinaryRecordBatch.this.allocator.getLimit();
     }
 
     @Override
@@ -322,9 +304,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
   /**
    * Queue of spilled partitions to process.
    */
-  private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
-  private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
-  private HashJoinSpilledPartition spilledInners[]; // for the outer to find the
+  protected final SpilledState<SpilledPartition> spilledState = new SpilledState<>();
+  private final Updater spilledStateUpdater = new Updater();
+  protected SpilledPartition[] spilledInners; // for the outer to find the
                                                     // partition
 
   public enum Metric implements MetricDef {
@@ -382,7 +364,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
         setupHashTable();
       }
 
-      hashJoinProbe = setupHashJoinProbe();
+      probe = createProbe();
     }
 
     // If we have a valid schema, this will build a valid container.
@@ -408,7 +390,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
   }
 
   /**
-   * Prefetches the first build side data holding batch.
+   * Prefetches the first probe side data holding batch.
    */
   private void prefetchFirstProbeBatch() {
     leftUpstream = prefetchFirstBatch(leftUpstream, prefetchedProbe,
@@ -570,8 +552,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
 
         if (isRowKeyJoin) {
           // discard the first left batch which was fetched by buildSchema, and
-          // get the new
-          // one based on rowkey join
+          // get the new one based on rowkey join
           leftUpstream = next(left);
         }
 
@@ -599,20 +580,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
           // is data
 
           if (state == BatchState.FIRST) {
-            // Initialize various settings for the probe side
-            hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType,
-                semiJoin, leftUpstream, partitions, spilledState.getCycle(),
-                container, spilledInners, buildSideIsEmpty.booleanValue(),
-                numPartitions, rightHVColPosition);
+            setupProbe();
           }
 
           // Allocate the memory for the vectors in the output container
           batchMemoryManager.allocateVectors(container);
 
-          hashJoinProbe
-              .setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+          probe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
-          outputRecords = hashJoinProbe.probeAndProject();
+          outputRecords = probe.probeAndProject();
 
           container.setValueCount(outputRecords);
 
@@ -647,7 +623,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
                                             // skipping; see "continue" below
 
             // Get the next (previously) spilled partition to handle as incoming
-            HashJoinSpilledPartition currSp = spilledState
+            SpilledPartition currSp = spilledState
                 .getNextSpilledPartition();
 
             // If the outer is empty (and it's not a right/full join) - try the
@@ -680,7 +656,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
               probeBatch = left; // if no outer batch then reuse left - needed
                                  // for updateIncoming()
               leftUpstream = IterOutcome.NONE;
-              hashJoinProbe.changeToFinalProbeState();
+              probe.changeToFinalProbeState();
             }
 
             spilledState.updateCycle(stats, currSp, spilledStateUpdater);
@@ -746,43 +722,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
   }
 
   private void setupHashTable() {
-    List<Comparator> comparators = Lists
-        .newArrayListWithExpectedSize(conditions.size());
-    conditions.forEach(cond -> comparators
-        .add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
-
     if (skipHashTableBuild) {
       return;
     }
 
-    // Setup the hash table configuration object
-    List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
-
-    // Create named expressions from the conditions
-    for (int i = 0; i < conditions.size(); i++) {
-      leftExpr.add(new NamedExpression(conditions.get(i).getLeft(),
-          new FieldReference("probe_side_" + i)));
-    }
-
-    // Set the left named expression to be null if the probe batch is empty.
-    if (leftUpstream != IterOutcome.OK_NEW_SCHEMA
-        && leftUpstream != IterOutcome.OK) {
-      leftExpr = null;
-    } else {
-      if (probeBatch.getSchema()
-          .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-        throw UserException.internalError(null).message(
-            "Hash join does not support probe batch with selection vectors.")
-            .addContext("Probe batch has selection mode",
-                (probeBatch.getSchema().getSelectionVectorMode()).toString())
-            .build(logger);
-      }
-    }
-
-    HashTableConfig htConfig = new HashTableConfig(
-        (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
-        true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
-        joinControl.asInt());
+    HashTableConfig htConfig = buildHashTableConfig();
 
     // Create the chained hash table
     baseHashTable = new ChainedHashTable(htConfig, context, allocator,
@@ -826,7 +770,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
       enableRuntimeFilter = false;
       return;
     }
-    RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
     List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
         .getBloomFilterDefs();
     for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
@@ -843,7 +786,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     }
     if (missingField) {
       logger.info(
-          "As some build side join key fields not found, runtime filter was disabled");
+          "As some build side key fields not found, runtime filter was disabled");
       enableRuntimeFilter = false;
       return;
     }
@@ -886,7 +829,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
           spilledState.getCycle(), numPartitions);
     }
 
-    spilledInners = new HashJoinSpilledPartition[numPartitions];
+    spilledInners = new SpilledPartition[numPartitions];
 
   }
 
@@ -900,7 +843,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     }
     runtimeFilterReporter = new RuntimeFilterReporter(
         (ExecutorFragmentContext) context);
-    RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
     // RuntimeFilter is not a necessary part of a HashJoin operator, only the
     // query which satisfy the
     // RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
@@ -922,7 +864,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
   }
 
   /**
-   * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not
+   * Tunes the number of partitions used by {@link AbstractHashBinaryRecordBatch}. If it is not
    * possible to spill it gives up and reverts to unbounded in memory operation.
    */
   private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
@@ -1014,7 +956,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
    * @return Returns an
    *         {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a
    *         termination condition is reached. Otherwise returns null.
-   * @throws SchemaChangeException
+   * @throws SchemaChangeException schema change exception
    */
   public IterOutcome executeBuildPhase() throws SchemaChangeException {
     if (buildSideIsEmpty.booleanValue()) {
@@ -1084,7 +1026,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
       case OK_NEW_SCHEMA:
         if (!buildSchema.equals(buildBatch.getSchema())) {
           throw SchemaChangeException.schemaChanged(
-              "Hash join does not support schema changes in build side.",
+            this.getClass().getSimpleName() + " does not support schema changes in build side.",
               buildSchema, buildBatch.getSchema());
         }
         for (HashPartition partn : partitions) {
@@ -1127,10 +1069,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
                                                              // HV column
           int currPart = hashCode & spilledState.getPartitionMask();
           hashCode >>>= spilledState.getBitsInMask();
-          // semi-join skips join-key-duplicate rows
-          if (semiJoin) {
-
-          }
           // Append the new inner row to the appropriate partition; spill (that
           // partition) if needed
           partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
@@ -1153,7 +1091,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
       if (bloomFilter2buildId.size() > 0) {
         int hashJoinOpId = this.popConfig.getOperatorId();
         runtimeFilterReporter.sendOut(bloomFilters, probeFields,
-            this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
+          runtimeFilterDef, hashJoinOpId);
       }
     }
 
@@ -1213,7 +1151,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
 
     for (HashPartition partn : partitions) {
       if (partn.isSpilled()) {
-        HashJoinSpilledPartition sp = new HashJoinSpilledPartition(
+        SpilledPartition sp = new SpilledPartition(
             spilledState.getCycle(), partn.getPartitionNum(), originalPartition,
             partn.getPartitionBatchesCount(), partn.getSpillFile());
 
@@ -1294,48 +1232,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
   /**
    * The constructor
    *
-   * @param popConfig
-   * @param context
-   * @param left
-   *          -- probe/outer side incoming input
-   * @param right
-   *          -- build/iner side incoming input
-   * @throws OutOfMemoryException
+   * @param popConfig T
+   * @param context FragmentContext
+   * @param left probe/outer side incoming input
+   * @param right build/iner side incoming input
+   * @throws OutOfMemoryException out of memory exception
    */
-  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
+  public AbstractHashBinaryRecordBatch(T popConfig, FragmentContext context,
       RecordBatch left, /* Probe side record batch */
       RecordBatch right /* Build side record batch */
   ) throws OutOfMemoryException {
     super(popConfig, context, true, left, right);
     this.buildBatch = right;
     this.probeBatch = left;
-    joinType = popConfig.getJoinType();
-    semiJoin = popConfig.isSemiJoin();
-    joinIsLeftOrFull = joinType == JoinRelType.LEFT
-        || joinType == JoinRelType.FULL;
-    joinIsRightOrFull = joinType == JoinRelType.RIGHT
-        || joinType == JoinRelType.FULL;
-    conditions = popConfig.getConditions();
     this.popConfig = popConfig;
-    this.isRowKeyJoin = popConfig.isRowKeyJoin();
-    this.joinControl = new JoinControl(popConfig.getJoinControl());
-
-    rightExpr = new ArrayList<>(conditions.size());
-    buildJoinColumns = Sets.newHashSet();
-    for (int i = 0; i < conditions.size(); i++) {
-      SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
-    }
-
-    for (int i = 0; i < conditions.size(); i++) {
-      SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
-      PathSegment.NameSegment nameSegment = (PathSegment.NameSegment) rightPath
-          .getLastSegment();
-      buildJoinColumns.add(nameSegment.getPath());
-      String refName = "build_side_" + i;
-      rightExpr.add(new NamedExpression(conditions.get(i).getRight(),
-          new FieldReference(refName)));
-    }
-
     this.allocator = oContext.getAllocator();
 
     numPartitions = (int) context.getOptions()
@@ -1386,14 +1296,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
 
     RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
         configuredBatchSize);
-
-    enableRuntimeFilter = context.getOptions()
-        .getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER)
-        && popConfig.getRuntimeFilterDef() != null;
   }
 
   /**
-   * This method is called when {@link HashJoinBatch} closes. It cleans up left
+   * This method is called when {@link AbstractHashBinaryRecordBatch} closes. It cleans up left
    * over spilled files that are in the spill queue, and closes the spillSet.
    */
   private void cleanup() {
@@ -1411,7 +1317,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
 
     // delete any spill file left in unread spilled partitions
     while (!spilledState.isEmpty()) {
-      HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
+      SpilledPartition sp = spilledState.getNextSpilledPartition();
       try {
         spillSet.delete(sp.innerSpillFile);
       } catch (IOException e) {
@@ -1488,55 +1394,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
-  /**
-   * Get the hash table iterator that is created for the build side of the hash
-   * join if this hash join was instantiated as a row-key join.
-   *
-   * @return hash table iterator or null if this hash join was not a row-key
-   *         join or if it was a row-key join but the build has not yet
-   *         completed.
-   */
-  @Override
-  public Pair<ValueVector, Integer> nextRowKeyBatch() {
-    if (buildComplete) {
-      // partition 0 because Row Key Join has only a single partition - no
-      // spilling
-      Pair<VectorContainer, Integer> pp = partitions[0].nextBatch();
-      if (pp != null) {
-        VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0);
-        ValueVector vv = vw.getValueVector();
-        return Pair.of(vv, pp.getRight());
-      }
-    } else if (partitions == null && firstOutputBatch) { // if there is data
-                                                         // coming to
-                                                         // right(build) side in
-                                                         // build Schema stage,
-                                                         // use it.
-      firstOutputBatch = false;
-      if (right.getRecordCount() > 0) {
-        VectorWrapper<?> vw = Iterables.get(right, 0);
-        ValueVector vv = vw.getValueVector();
-        return Pair.of(vv, right.getRecordCount() - 1);
-      }
-    }
-    return null;
-  }
-
-  @Override // implement RowKeyJoin interface
-  public boolean hasRowKeyBatch() {
-    return buildComplete;
-  }
-
-  @Override // implement RowKeyJoin interface
-  public BatchState getBatchState() {
-    return state;
-  }
-
-  @Override // implement RowKeyJoin interface
-  public void setBatchState(BatchState newState) {
-    state = newState;
-  }
-
   @Override
   protected void cancelIncoming() {
     wasKilled = true;
@@ -1545,44 +1402,34 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
   }
 
   public void updateMetrics() {
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_INPUT_BATCH_COUNT,
         batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
         batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
         batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.LEFT_INPUT_RECORD_COUNT,
         batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
 
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
         batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
         batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
         batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
         batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
 
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.OUTPUT_BATCH_COUNT,
         batchMemoryManager.getNumOutgoingBatches());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
         batchMemoryManager.getAvgOutputBatchSize());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.AVG_OUTPUT_ROW_BYTES,
         batchMemoryManager.getAvgOutputRowWidth());
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
+    stats.setLongStat(AbstractHashBinaryRecordBatch.Metric.OUTPUT_RECORD_COUNT,
         batchMemoryManager.getTotalOutputRecords());
   }
 
-  @Override
-  public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) {
-    this.rkJoinState = newState;
-  }
-
-  @Override
-  public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() {
-    return rkJoinState;
-  }
-
   @Override
   public void close() {
     if (!spilledState.isFirstCycle()) { // spilling happened
@@ -1628,17 +1475,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     super.close();
   }
 
-  public HashJoinProbe setupHashJoinProbe() {
-    // No real code generation !!
-    return new HashJoinProbeTemplate();
-  }
+  public abstract Probe createProbe();
+  public abstract void setupProbe() throws SchemaChangeException;
 
-  @Override
-  public void dump() {
-    logger.error(
-        "HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={},"
-            + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]",
-        container, left, right, leftUpstream, rightUpstream, joinType,
-        hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
-  }
+  protected abstract HashTableConfig buildHashTableConfig();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index a08e4308cd..60de29091f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,743 +17,106 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
-import org.apache.drill.exec.memory.BaseAllocator;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
-import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
-import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
-import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.Comparator;
-import org.apache.drill.exec.physical.impl.common.HashPartition;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import org.apache.drill.exec.physical.impl.common.SpilledState;
-import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.planner.common.JoinControl;
-import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.JoinBatchMemoryManager;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.record.RecordBatchStats;
-import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
-import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.drill.exec.work.filter.BloomFilter;
-import org.apache.drill.exec.work.filter.BloomFilterDef;
-import org.apache.drill.exec.work.filter.RuntimeFilterDef;
-import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Implements the runtime execution for the Hash-Join operator supporting INNER,
  * LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
- * <p>
- * This implementation splits the incoming Build side rows into multiple
- * Partitions, thus allowing spilling of some of these partitions to disk if
- * memory gets tight. Each partition is implemented as a {@link HashPartition}.
- * After the build phase is over, in the most general case, some of the
- * partitions were spilled, and the others are in memory. Each of the partitions
- * in memory would get a {@link HashTable} built.
- * <p>
- * Next the Probe side is read, and each row is key matched with a Build
- * partition. If that partition is in memory, then the key is used to probe and
- * perform the join, and the results are added to the outgoing batch. But if
- * that build side partition was spilled, then the matching Probe size partition
- * is spilled as well.
- * <p>
- * After all the Probe side was processed, we are left with pairs of spilled
- * partitions. Then each pair is processed individually (that Build partition
- * should be smaller than the original, hence likely fit whole into memory to
- * allow probing; if not -- see below).
- * <p>
- * Processing of each spilled pair is EXACTLY like processing the original
- * Build/Probe incomings. (As a fact, the {@link #innerNext()} method calls
- * itself recursively !!). Thus the spilled build partition is read and divided
- * into new partitions, which in turn may spill again (and again...). The code
- * tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or
- * greater) is a waste, indicating that the number of partitions chosen was too
- * small.
  */
-public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
+public class HashJoinBatch extends AbstractHashBinaryRecordBatch<HashJoinPOP>
     implements RowKeyJoin {
-  private static final Logger logger = LoggerFactory
-      .getLogger(HashJoinBatch.class);
-
-  /**
-   * The maximum number of records within each internal batch.
-   */
-  private final int RECORDS_PER_BATCH; // internal batches
-
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
-  private final boolean semiJoin;
-  private final boolean joinIsLeftOrFull;
-  private final boolean joinIsRightOrFull;
-  private boolean skipHashTableBuild; // when outer side is empty, and the join
-                                      // is inner or left (see DRILL-6755)
-
   // Join conditions
   private final List<JoinCondition> conditions;
-
-  private RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
-
-  // Runtime generated class implementing HashJoinProbe interface
-  private HashJoinProbe hashJoinProbe = null;
-
-  private final List<NamedExpression> rightExpr;
-
-  /**
-   * Names of the join columns. This names are used in order to help estimate
-   * the size of the {@link HashTable}s.
-   */
-  private final Set<String> buildJoinColumns;
-
-  // Fields used for partitioning
-  /**
-   * The number of {@link HashPartition}s. This is configured via a system
-   * option and set in
-   * {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
-   */
-  private int numPartitions = 1; // must be 2 to the power of bitsInMask
-
-  /**
-   * The master class used to generate {@link HashTable}s.
-   */
-  private ChainedHashTable baseHashTable;
-  private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
-  private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
-  private boolean canSpill = true;
-  private boolean wasKilled; // a kill was received, may need to clean spilled
-                             // partns
-
-  /**
-   * This array holds the currently active {@link HashPartition}s.
-   */
-  HashPartition partitions[];
-
-  // Number of records in the output container
-  private int outputRecords;
-
-  // Schema of the build side
-  private BatchSchema buildSchema;
-  // Schema of the probe side
-  private BatchSchema probeSchema;
-
-  // Whether this HashJoin is used for a row-key based join
-  private final boolean isRowKeyJoin;
-
   private final JoinControl joinControl;
 
-  // An iterator over the build side hash table (only applicable for row-key
-  // joins)
-  private boolean buildComplete;
-
-  // indicates if we have previously returned an output batch
-  private boolean firstOutputBatch = true;
-
-  private int rightHVColPosition;
-  private final BufferAllocator allocator;
-  // Local fields for left/right incoming - may be replaced when reading from
-  // spilled
-  private RecordBatch buildBatch;
-  private RecordBatch probeBatch;
-
-  /**
-   * Flag indicating whether or not the first data holding build batch needs to
-   * be fetched.
-   */
-  private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
-  /**
-   * Flag indicating whether or not the first data holding probe batch needs to
-   * be fetched.
-   */
-  private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
-
-  // For handling spilling
-  private final SpillSet spillSet;
-  HashJoinPOP popConfig;
-
-  private final int originalPartition = -1; // the partition a secondary reads
-                                            // from
-  IntVector read_right_HV_vector; // HV vector that was read from the spilled
-                                  // batch
-  private final int maxBatchesInMemory;
-  private final List<String> probeFields = new ArrayList<>(); // keep the same
-                                                              // sequence with
-                                                              // the
-                                                              // bloomFilters
-  private boolean enableRuntimeFilter;
-  private RuntimeFilterReporter runtimeFilterReporter;
-  private ValueVectorHashHelper.Hash64 hash64;
-  private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
-  private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
-  private final List<BloomFilter> bloomFilters = new ArrayList<>();
-  private boolean bloomFiltersGenerated;
-
   /**
-   * This holds information about the spilled partitions for the build and probe
-   * side.
+   * The constructor
+   *
+   * @param popConfig HashJoinPOP
+   * @param context FragmentContext
+   * @param left probe/outer side incoming input
+   * @param right build/iner side incoming input
+   * @throws OutOfMemoryException out of memory exception
    */
-  public static class HashJoinSpilledPartition
-      extends AbstractSpilledPartitionMetadata {
-    private final int innerSpilledBatches;
-    private final String innerSpillFile;
-    private int outerSpilledBatches;
-    private String outerSpillFile;
-    private boolean updatedOuter;
-
-    public HashJoinSpilledPartition(int cycle, int originPartition,
-        int prevOriginPartition, int innerSpilledBatches,
-        String innerSpillFile) {
-      super(cycle, originPartition, prevOriginPartition);
-
-      this.innerSpilledBatches = innerSpilledBatches;
-      this.innerSpillFile = innerSpillFile;
-    }
-
-    public int getInnerSpilledBatches() {
-      return innerSpilledBatches;
-    }
-
-    public String getInnerSpillFile() {
-      return innerSpillFile;
-    }
-
-    public int getOuterSpilledBatches() {
-      Preconditions.checkState(updatedOuter);
-      return outerSpilledBatches;
-    }
-
-    public String getOuterSpillFile() {
-      Preconditions.checkState(updatedOuter);
-      return outerSpillFile;
-    }
-
-    public void updateOuter(int outerSpilledBatches, String outerSpillFile) {
-      Preconditions.checkState(!updatedOuter);
-      updatedOuter = true;
-
-      this.outerSpilledBatches = outerSpilledBatches;
-      this.outerSpillFile = outerSpillFile;
-    }
-
-    @Override
-    public String makeDebugString() {
-      return String.format(
-          "Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).",
-          this.getOriginPartition(), this.getPrevOriginPartition(),
-          this.getCycle(), outerSpilledBatches, innerSpilledBatches);
-    }
-  }
-
-  public class HashJoinUpdater implements SpilledState.Updater {
-    @Override
-    public void cleanup() {
-      HashJoinBatch.this.cleanup();
-    }
+  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
+      RecordBatch left, /* Probe side record batch */
+      RecordBatch right /* Build side record batch */
+  ) throws OutOfMemoryException {
+    super(popConfig, context, left, right);
+    joinType = popConfig.getJoinType();
+    conditions = popConfig.getConditions();
+    this.joinControl = new JoinControl(popConfig.getJoinControl());
 
-    @Override
-    public String getFailureMessage() {
-      return "Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates).";
-    }
+    semiJoin = popConfig.isSemiJoin();
+    joinIsLeftOrFull = joinType == JoinRelType.LEFT
+      || joinType == JoinRelType.FULL;
+    joinIsRightOrFull = joinType == JoinRelType.RIGHT
+      || joinType == JoinRelType.FULL;
+    this.isRowKeyJoin = popConfig.isRowKeyJoin();
 
-    @Override
-    public long getMemLimit() {
-      return HashJoinBatch.this.allocator.getLimit();
+    for (int i = 0; i < conditions.size(); i++) {
+      SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+      PathSegment.NameSegment nameSegment = (PathSegment.NameSegment) rightPath
+        .getLastSegment();
+      buildJoinColumns.add(nameSegment.getPath());
+      String refName = "build_side_" + i;
+      rightExpr.add(new NamedExpression(conditions.get(i).getRight(),
+        new FieldReference(refName)));
     }
 
-    @Override
-    public boolean hasPartitionLimit() {
-      return true;
-    }
-  }
-
-  /**
-   * Queue of spilled partitions to process.
-   */
-  private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
-  private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
-  private HashJoinSpilledPartition spilledInners[]; // for the outer to find the
-                                                    // partition
-
-  public enum Metric implements MetricDef {
-    NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, RESIZING_TIME_MS, NUM_PARTITIONS,
-    // number of original partitions spilled to disk
-    SPILLED_PARTITIONS,
-    SPILL_MB, // Number of MB of data spilled to disk. This amount is first
-              // written,
-              // then later re-read. So, disk I/O is twice this amount.
-    SPILL_CYCLE, // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
-    LEFT_INPUT_BATCH_COUNT, LEFT_AVG_INPUT_BATCH_BYTES, LEFT_AVG_INPUT_ROW_BYTES,
-    LEFT_INPUT_RECORD_COUNT, RIGHT_INPUT_BATCH_COUNT, RIGHT_AVG_INPUT_BATCH_BYTES,
-    RIGHT_AVG_INPUT_ROW_BYTES, RIGHT_INPUT_RECORD_COUNT, OUTPUT_BATCH_COUNT,
-    AVG_OUTPUT_BATCH_BYTES, AVG_OUTPUT_ROW_BYTES, OUTPUT_RECORD_COUNT;
-
-    // duplicate for hash ag
-
-    @Override
-    public int metricId() {
-      return ordinal();
-    }
+    runtimeFilterDef = popConfig.getRuntimeFilterDef();
+    enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER)
+      && runtimeFilterDef != null;
   }
 
   @Override
-  public int getRecordCount() {
-    return outputRecords;
+  public Probe createProbe() {
+    // No real code generation !!
+    return new HashJoinProbeTemplate();
   }
 
   @Override
-  protected void buildSchema() {
-    // We must first get the schemas from upstream operators before we can build
-    // our schema.
-    boolean validSchema = prefetchFirstBatchFromBothSides();
-
-    if (validSchema) {
-      // We are able to construct a valid schema from the upstream data.
-      // Setting the state here makes sure AbstractRecordBatch returns
-      // OK_NEW_SCHEMA
-      state = BatchState.BUILD_SCHEMA;
-
-      if (leftUpstream == OK_NEW_SCHEMA) {
-        probeSchema = left.getSchema();
-      }
-
-      if (rightUpstream == OK_NEW_SCHEMA) {
-        buildSchema = right.getSchema();
-        // position of the new "column" for keeping the hash values
-        // (after the real columns)
-        rightHVColPosition = right.getContainer().getNumberOfColumns();
-        // In special cases, when the probe side is empty, and
-        // inner/left join - no need for Hash Table
-        skipHashTableBuild = leftUpstream == IterOutcome.NONE
-            && !joinIsRightOrFull;
-        // We only need the hash tables if we have data on the build side.
-        setupHashTable();
-      }
-
-      hashJoinProbe = setupHashJoinProbe();
-    }
-
-    // If we have a valid schema, this will build a valid container.
-    // If we were unable to obtain a valid schema,
-    // we still need to build a dummy schema. This code handles both cases for
-    // us.
-    setupOutputContainerSchema();
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    container.setEmpty();
-  }
-
-  /**
-   * Prefetches the first build side data holding batch.
-   */
-  private void prefetchFirstBuildBatch() {
-    rightUpstream = prefetchFirstBatch(rightUpstream, prefetchedBuild,
-        buildSideIsEmpty, RIGHT_INDEX, buildBatch, () -> {
-          batchMemoryManager.update(RIGHT_INDEX, 0, true);
-          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
-              batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX),
-              getRecordBatchStatsContext());
-        });
-  }
-
-  /**
-   * Prefetches the first build side data holding batch.
-   */
-  private void prefetchFirstProbeBatch() {
-    leftUpstream = prefetchFirstBatch(leftUpstream, prefetchedProbe,
-        probeSideIsEmpty, LEFT_INDEX, probeBatch, () -> {
-          batchMemoryManager.update(LEFT_INDEX, 0);
-          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
-              batchMemoryManager.getRecordBatchSizer(LEFT_INDEX),
-              getRecordBatchStatsContext());
-        });
-  }
-
-  /**
-   * Used to fetch the first data holding batch from either the build or probe
-   * side.
-   *
-   * @param outcome
-   *          The current upstream outcome for either the build or probe side.
-   * @param prefetched
-   *          A flag indicating if we have already done a prefetch of the first
-   *          data holding batch for the probe or build side.
-   * @param isEmpty
-   *          A flag indicating if the probe or build side is empty.
-   * @param index
-   *          The upstream index of the probe or build batch.
-   * @param batch
-   *          The probe or build batch itself.
-   * @param memoryManagerUpdate
-   *          A lambda function to execute the memory manager update for the
-   *          probe or build batch.
-   * @return The current
-   *         {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
-   */
-  private IterOutcome prefetchFirstBatch(IterOutcome outcome,
-      MutableBoolean prefetched, MutableBoolean isEmpty, int index,
-      RecordBatch batch, Runnable memoryManagerUpdate) {
-    if (prefetched.booleanValue()) {
-      // We have already prefetch the first data holding batch
-      return outcome;
-    }
-
-    // If we didn't retrieve our first data holding batch, we need to do it now.
-    prefetched.setValue(true);
-
-    if (outcome != IterOutcome.NONE) {
-      // We can only get data if there is data available
-      outcome = sniffNonEmptyBatch(outcome, index, batch);
-    }
-
-    isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there
-                                                   // is no data.
-
-    // Got our first batch(es)
-    if (spilledState.isFirstCycle()) {
-      // Only collect stats for the first cycle
-      memoryManagerUpdate.run();
-    }
-    state = BatchState.FIRST;
-    return outcome;
-  }
-
-  /**
-   * Currently in order to accurately predict memory usage for spilling, the
-   * first non-empty build or probe side batch is needed. This method fetches
-   * the first non-empty batch from the probe or build side.
-   *
-   * @param curr
-   *          The current outcome.
-   * @param inputIndex
-   *          Index specifying whether to work with the prorbe or build input.
-   * @param recordBatch
-   *          The probe or build record batch.
-   * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}
-   *         for the left or right record batch.
-   */
-  private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex,
-      RecordBatch recordBatch) {
-    while (true) {
-      if (recordBatch.getRecordCount() != 0) {
-        return curr;
-      }
-
-      curr = next(inputIndex, recordBatch);
-
-      switch (curr) {
-      case OK:
-        // We got a data batch
-        break;
-      case NOT_YET:
-        // We need to try again
-        break;
-      case EMIT:
-        throw new UnsupportedOperationException("We do not support " + EMIT);
-      default:
-        // Other cases are termination conditions
-        return curr;
-      }
-    }
-  }
-
-  /**
-   * Determines the memory calculator to use. If maxNumBatches is configured
-   * simple batch counting is used to spill. Otherwise memory calculations are
-   * used to determine when to spill.
-   *
-   * @return The memory calculator to use.
-   */
-  public HashJoinMemoryCalculator getCalculatorImpl() {
-    if (maxBatchesInMemory == 0) {
-      double safetyFactor = context.getOptions()
-          .getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
-      double fragmentationFactor = context.getOptions()
-          .getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
-      double hashTableDoublingFactor = context.getOptions()
-          .getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
-      String hashTableCalculatorType = context.getOptions()
-          .getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
-
-      return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor,
-          hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
-    } else {
-      return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
-    }
+  public void setupProbe() throws SchemaChangeException {
+    probe.setup(probeBatch, this, joinType,
+      semiJoin, leftUpstream, partitions, spilledState.getCycle(),
+      container, spilledInners, buildSideIsEmpty.booleanValue(),
+      numPartitions, rightHVColPosition);
   }
 
   @Override
-  public IterOutcome innerNext() {
-    if (wasKilled) {
-      // We have received a cancel signal. We need to stop processing.
-      cleanup();
-      return IterOutcome.NONE;
-    }
-
-    prefetchFirstBuildBatch();
-
-    if (rightUpstream.isError()) {
-      // A termination condition was reached while prefetching the first build
-      // side data holding batch.
-      // We need to terminate.
-      return rightUpstream;
-    }
-
-    try {
-      /*
-       * If we are here for the first time, execute the build phase of the hash
-       * join and setup the run time generated class for the probe side
-       */
-      if (state == BatchState.FIRST) {
-        // Build the hash table, using the build side record batches.
-        IterOutcome buildExecuteTermination = executeBuildPhase();
-
-        if (buildExecuteTermination != null) {
-          // A termination condition was reached while executing the build
-          // phase.
-          // We need to terminate.
-          return buildExecuteTermination;
-        }
-
-        buildComplete = true;
-
-        if (isRowKeyJoin) {
-          // discard the first left batch which was fetched by buildSchema, and
-          // get the new
-          // one based on rowkey join
-          leftUpstream = next(left);
-        }
-
-        // Update the hash table related stats for the operator
-        updateStats();
-      }
-
-      // Try to probe and project, or recursively handle a spilled partition
-      if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
-          joinIsLeftOrFull) { // or if this is a left/full outer join
-
-        prefetchFirstProbeBatch();
-
-        if (leftUpstream.isError()
-            || (leftUpstream == NONE && !joinIsRightOrFull)) {
-          // A termination condition was reached while prefetching the first
-          // probe side data holding batch.
-          // We need to terminate.
-          return leftUpstream;
-        }
-
-        if (!buildSideIsEmpty.booleanValue()
-            || !probeSideIsEmpty.booleanValue()) {
-          // Only allocate outgoing vectors and execute probing logic if there
-          // is data
-
-          if (state == BatchState.FIRST) {
-            // Initialize various settings for the probe side
-            hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType,
-                semiJoin, leftUpstream, partitions, spilledState.getCycle(),
-                container, spilledInners, buildSideIsEmpty.booleanValue(),
-                numPartitions, rightHVColPosition);
-          }
-
-          // Allocate the memory for the vectors in the output container
-          batchMemoryManager.allocateVectors(container);
-
-          hashJoinProbe
-              .setTargetOutputCount(batchMemoryManager.getOutputRowCount());
-
-          outputRecords = hashJoinProbe.probeAndProject();
-
-          container.setValueCount(outputRecords);
-
-          batchMemoryManager.updateOutgoingStats(outputRecords);
-          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this,
-              getRecordBatchStatsContext());
-
-          /*
-           * We are here because of one the following 1. Completed processing of
-           * all the records and we are done 2. We've filled up the outgoing
-           * batch to the maximum and we need to return upstream Either case
-           * build the output container's schema and return
-           */
-          if (outputRecords > 0 || state == BatchState.FIRST) {
-            state = BatchState.NOT_FIRST;
-
-            return IterOutcome.OK;
-          }
-        }
-
-        // Free all partitions' in-memory data structures
-        // (In case need to start processing spilled partitions)
-        for (HashPartition partn : partitions) {
-          partn.cleanup(false); // clean, but do not delete the spill files !!
-        }
-
-        //
-        // (recursively) Handle the spilled partitions, if any
-        //
-        if (!buildSideIsEmpty.booleanValue()) {
-          while (!spilledState.isEmpty()) { // "while" is only used for
-                                            // skipping; see "continue" below
-
-            // Get the next (previously) spilled partition to handle as incoming
-            HashJoinSpilledPartition currSp = spilledState
-                .getNextSpilledPartition();
-
-            // If the outer is empty (and it's not a right/full join) - try the
-            // next spilled partition
-            if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
-              continue;
-            }
-
-            // Create a BUILD-side "incoming" out of the inner spill file of
-            // that partition
-            buildBatch = new SpilledRecordBatch(currSp.innerSpillFile,
-                currSp.innerSpilledBatches, context, buildSchema, oContext,
-                spillSet);
-            // The above ctor call also got the first batch; need to update the
-            // outcome
-            rightUpstream = ((SpilledRecordBatch) buildBatch)
-                .getInitialOutcome();
-
-            if (currSp.outerSpilledBatches > 0) {
-              // Create a PROBE-side "incoming" out of the outer spill file of
-              // that partition
-              probeBatch = new SpilledRecordBatch(currSp.outerSpillFile,
-                  currSp.outerSpilledBatches, context, probeSchema, oContext,
-                  spillSet);
-              // The above ctor call also got the first batch; need to update
-              // the outcome
-              leftUpstream = ((SpilledRecordBatch) probeBatch)
-                  .getInitialOutcome();
-            } else {
-              probeBatch = left; // if no outer batch then reuse left - needed
-                                 // for updateIncoming()
-              leftUpstream = IterOutcome.NONE;
-              hashJoinProbe.changeToFinalProbeState();
-            }
-
-            spilledState.updateCycle(stats, currSp, spilledStateUpdater);
-            state = BatchState.FIRST; // TODO need to determine if this is still
-                                      // necessary since
-                                      // prefetchFirstBatchFromBothSides sets
-                                      // this
-
-            prefetchedBuild.setValue(false);
-            prefetchedProbe.setValue(false);
-
-            return innerNext(); // start processing the next spilled partition
-                                // "recursively"
-          }
-        }
-
-      } else {
-        // Our build side is empty, we won't have any matches, clear the probe
-        // side
-        killAndDrainLeftUpstream();
-      }
-
-      // No more output records, clean up and return
-      state = BatchState.DONE;
-
-      cleanup();
-
-      return IterOutcome.NONE;
-    } catch (SchemaChangeException e) {
-      throw UserException.schemaChangeError(e).build(logger);
-    }
-  }
-
-  /**
-   * In case an upstream data is no longer needed, send a kill and flush any
-   * remaining batch
-   *
-   * @param batch
-   *          probe or build batch
-   * @param upstream
-   *          which upstream
-   * @param isLeft
-   *          is it the left or right
-   */
-  private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream,
-      boolean isLeft) {
-    batch.cancel();
-    while (upstream == IterOutcome.OK_NEW_SCHEMA
-        || upstream == IterOutcome.OK) {
-      VectorAccessibleUtilities.clear(batch);
-      upstream = next(
-          isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT,
-          batch);
-    }
-  }
-
-  private void killAndDrainLeftUpstream() {
-    killAndDrainUpstream(probeBatch, leftUpstream, true);
-  }
-
-  private void killAndDrainRightUpstream() {
-    killAndDrainUpstream(buildBatch, rightUpstream, false);
-  }
-
-  private void setupHashTable() {
+  protected HashTableConfig buildHashTableConfig() {
     List<Comparator> comparators = Lists
-        .newArrayListWithExpectedSize(conditions.size());
+      .newArrayListWithExpectedSize(conditions.size());
     conditions.forEach(cond -> comparators
-        .add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
-
-    if (skipHashTableBuild) {
-      return;
-    }
+      .add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
     // Setup the hash table configuration object
     List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -761,731 +124,62 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     // Create named expressions from the conditions
     for (int i = 0; i < conditions.size(); i++) {
       leftExpr.add(new NamedExpression(conditions.get(i).getLeft(),
-          new FieldReference("probe_side_" + i)));
+        new FieldReference("probe_side_" + i)));
     }
 
     // Set the left named expression to be null if the probe batch is empty.
     if (leftUpstream != IterOutcome.OK_NEW_SCHEMA
-        && leftUpstream != IterOutcome.OK) {
+      && leftUpstream != IterOutcome.OK) {
       leftExpr = null;
     } else {
       if (probeBatch.getSchema()
-          .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+        .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
         throw UserException.internalError(null).message(
             "Hash join does not support probe batch with selection vectors.")
-            .addContext("Probe batch has selection mode",
-                (probeBatch.getSchema().getSelectionVectorMode()).toString())
-            .build(logger);
-      }
-    }
-
-    HashTableConfig htConfig = new HashTableConfig(
-        (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
-        true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
-        joinControl.asInt());
-
-    // Create the chained hash table
-    baseHashTable = new ChainedHashTable(htConfig, context, allocator,
-        buildBatch, probeBatch, null);
-    if (enableRuntimeFilter) {
-      setupHash64(htConfig);
-    }
-  }
-
-  private void setupHash64(HashTableConfig htConfig) {
-    LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig
-        .getKeyExprsBuild().size()];
-    ErrorCollector collector = new ErrorCollectorImpl();
-    int i = 0;
-    for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
-      LogicalExpression expr = ExpressionTreeMaterializer.materialize(
-          ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
-      collector.reportErrors(logger);
-      if (expr == null) {
-        continue;
-      }
-      keyExprsBuild[i] = expr;
-      i++;
-    }
-    i = 0;
-    boolean missingField = false;
-    TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length];
-    for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
-      SchemaPath schemaPath = (SchemaPath) ne.getExpr();
-      TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
-      if (typedFieldId == null) {
-        missingField = true;
-        break;
-      }
-      buildSideTypeFieldIds[i] = typedFieldId;
-      i++;
-    }
-    if (missingField) {
-      logger.info(
-          "As some build side key fields not found, runtime filter was disabled");
-      enableRuntimeFilter = false;
-      return;
-    }
-    RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
-    List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
-        .getBloomFilterDefs();
-    for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-      String buildField = bloomFilterDef.getBuildField();
-      SchemaPath schemaPath = new SchemaPath(
-          new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
-      TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
-      if (typedFieldId == null) {
-        missingField = true;
-        break;
-      }
-      int fieldId = typedFieldId.getFieldIds()[0];
-      bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
-    }
-    if (missingField) {
-      logger.info(
-          "As some build side join key fields not found, runtime filter was disabled");
-      enableRuntimeFilter = false;
-      return;
-    }
-    ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch,
-        context);
-    try {
-      hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
-    } catch (Exception e) {
-      throw UserException.internalError(e)
-          .message("Failed to construct a field's hash64 dynamic codes")
+          .addContext("Probe batch has selection mode",
+            (probeBatch.getSchema().getSelectionVectorMode()).toString())
           .build(logger);
-    }
-  }
-
-  /**
-   * Call only after num partitions is known
-   */
-  private void delayedSetup() {
-    //
-    // Find out the estimated max batch size, etc
-    // and compute the max numPartitions possible
-    // See partitionNumTuning()
-    //
-
-    spilledState.initialize(numPartitions);
-    // Create array for the partitions
-    partitions = new HashPartition[numPartitions];
-  }
-
-  /**
-   * Initialize fields (that may be reused when reading spilled partitions)
-   */
-  private void initializeBuild() {
-    baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process
-                                                          // the spilled files
-    // Recreate the partitions every time build is initialized
-    for (int part = 0; part < numPartitions; part++) {
-      partitions[part] = new HashPartition(context, allocator, baseHashTable,
-          buildBatch, probeBatch, semiJoin, RECORDS_PER_BATCH, spillSet, part,
-          spilledState.getCycle(), numPartitions);
-    }
-
-    spilledInners = new HashJoinSpilledPartition[numPartitions];
-
-  }
-
-  /**
-   * Note: This method can not be called again as part of recursive call of
-   * executeBuildPhase() to handle spilled build partitions.
-   */
-  private void initializeRuntimeFilter() {
-    if (!enableRuntimeFilter || bloomFiltersGenerated) {
-      return;
-    }
-    runtimeFilterReporter = new RuntimeFilterReporter(
-        (ExecutorFragmentContext) context);
-    RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
-    // RuntimeFilter is not a necessary part of a HashJoin operator, only the
-    // query which satisfy the
-    // RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
-    if (runtimeFilterDef != null) {
-      List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
-          .getBloomFilterDefs();
-      for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-        int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
-        int numBytes = bloomFilterDef.getNumBytes();
-        String probeField = bloomFilterDef.getProbeField();
-        probeFields.add(probeField);
-        BloomFilter bloomFilter = new BloomFilter(numBytes,
-            context.getAllocator());
-        bloomFilters.add(bloomFilter);
-        bloomFilter2buildId.put(bloomFilter, buildFieldId);
       }
     }
-    bloomFiltersGenerated = true;
-  }
 
-  /**
-   * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not
-   * possible to spill it gives up and reverts to unbounded in memory operation.
-   */
-  private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
-      int maxBatchSize,
-      HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
-    // Get auto tuning result
-    numPartitions = buildCalc.getNumPartitions();
-
-    if (logger.isDebugEnabled()) {
-      logger.debug(buildCalc.makeDebugString());
-    }
-
-    if (buildCalc.getMaxReservedMemory() > allocator.getLimit()) {
-      // We don't have enough memory to do any spilling. Give up and do no
-      // spilling and have no limits
-
-      // TODO dirty hack to prevent regressions. Remove this once batch sizing
-      // is implemented.
-      // We don't have enough memory to do partitioning, we have to do
-      // everything in memory
-      String message = String.format(
-          "When using the minimum number of partitions %d we require %s memory but only have %s available. "
-              + "Forcing legacy behavior of using unbounded memory in order to prevent regressions.",
-          numPartitions,
-          FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
-          FileUtils.byteCountToDisplaySize(allocator.getLimit()));
-      logger.warn(message);
-
-      // create a Noop memory calculator
-      HashJoinMemoryCalculator calc = getCalculatorImpl();
-      calc.initialize(false);
-      buildCalc = calc.next();
-
-      buildCalc.initialize(true, true, // TODO Fix after growing hash values bug
-                                       // fixed
-          buildBatch, probeBatch, buildJoinColumns,
-          leftUpstream == IterOutcome.NONE, // probeEmpty
-          allocator.getLimit(), numPartitions, RECORDS_PER_BATCH,
-          RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
-          batchMemoryManager.getOutputBatchSize(),
-          HashTable.DEFAULT_LOAD_FACTOR);
-
-      disableSpilling(null);
-    }
-
-    return buildCalc;
-  }
-
-  /**
-   * Disable spilling - use only a single partition and set the memory limit to
-   * the max ( 10GB )
-   *
-   * @param reason
-   *          If not null - log this as warning, else check fallback setting to
-   *          either warn or fail.
-   */
-  private void disableSpilling(String reason) {
-    // Fail, or just issue a warning if a reason was given, or a fallback option
-    // is enabled
-    if (reason == null) {
-      boolean fallbackEnabled = context.getOptions()
-          .getBoolean(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY);
-      if (fallbackEnabled) {
-        logger.warn(
-            "Spilling is disabled - not enough memory available for internal partitioning. Falling back"
-                + " to use unbounded memory");
-      } else {
-        throw UserException.resourceError().message(String.format(
-            "Not enough memory for internal partitioning and fallback mechanism for "
-                + "HashJoin to use unbounded memory is disabled.\n" +
-                "Either enable fallback option %s using ALTER "
-                + "SESSION/SYSTEM command or increase the memory limit for the Drillbit",
-            ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
-      }
-    } else {
-      logger.warn(reason);
-    }
-
-    numPartitions = 1; // We are only using one partition
-    canSpill = false; // We cannot spill
-    allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and
-                                                     // force unbounded memory
+    return new HashTableConfig(
+      (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+      true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
+      joinControl.asInt(), false);
   }
 
-  /**
-   * Execute the BUILD phase; first read incoming and split rows into
-   * partitions; may decide to spill some of the partitions
-   *
-   * @return Returns an
-   *         {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a
-   *         termination condition is reached. Otherwise returns null.
-   * @throws SchemaChangeException
-   */
-  public IterOutcome executeBuildPhase() throws SchemaChangeException {
-    if (buildSideIsEmpty.booleanValue()) {
-      // empty right
-      return null;
-    }
-
-    if (skipHashTableBuild) { // No hash table needed - then consume all the
-                              // right upstream
-      killAndDrainRightUpstream();
-      return null;
-    }
-
-    HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
-
-    {
-      // Initializing build calculator
-      // Limit scope of these variables to this block
-      int maxBatchSize = spilledState.isFirstCycle()
-          ? RecordBatch.MAX_BATCH_ROW_COUNT
-          : RECORDS_PER_BATCH;
-      boolean doMemoryCalculation = canSpill
-          && !probeSideIsEmpty.booleanValue();
-      HashJoinMemoryCalculator calc = getCalculatorImpl();
-
-      calc.initialize(doMemoryCalculation);
-      buildCalc = calc.next();
-
-      buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after
-                                                              // growing hash
-                                                              // values bug
-                                                              // fixed
-          buildBatch, probeBatch, buildJoinColumns,
-          probeSideIsEmpty.booleanValue(), allocator.getLimit(), numPartitions,
-          RECORDS_PER_BATCH, RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
-          batchMemoryManager.getOutputBatchSize(),
-          HashTable.DEFAULT_LOAD_FACTOR);
-
-      if (spilledState.isFirstCycle() && doMemoryCalculation) {
-        // Do auto tuning
-        buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
-      }
-    }
-
-    if (spilledState.isFirstCycle()) {
-      // Do initial setup only on the first cycle
-      delayedSetup();
-    }
-
-    initializeBuild();
-
-    initializeRuntimeFilter();
-
-    // Make the calculator aware of our partitions
-    HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(
-        partitions);
-    buildCalc.setPartitionStatSet(partitionStatSet);
-
-    boolean moreData = true;
-    while (moreData) {
-      switch (rightUpstream) {
-      case NONE:
-      case NOT_YET:
-        moreData = false;
-        continue;
-
-      case OK_NEW_SCHEMA:
-        if (!buildSchema.equals(buildBatch.getSchema())) {
-          throw SchemaChangeException.schemaChanged(
-              "Hash join does not support schema changes in build side.",
-              buildSchema, buildBatch.getSchema());
-        }
-        for (HashPartition partn : partitions) {
-          partn.updateBatches();
-        }
-        // Fall through
-      case OK:
-        batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
-        int currentRecordCount = buildBatch.getRecordCount();
-        // create runtime filter
-        if (spilledState.isFirstCycle() && enableRuntimeFilter) {
-          // create runtime filter and send out async
-          for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
-            int fieldId = bloomFilter2buildId.get(bloomFilter);
-            for (int ind = 0; ind < currentRecordCount; ind++) {
-              long hashCode = hash64.hash64Code(ind, 0, fieldId);
-              bloomFilter.insert(hashCode);
-            }
-          }
-        }
-        // Special treatment (when no spill, and single partition) -- use the
-        // incoming vectors as they are (no row copy)
-        if (numPartitions == 1) {
-          partitions[0].appendBatch(buildBatch);
-          break;
-        }
-
-        if (!spilledState.isFirstCycle()) {
-          read_right_HV_vector = (IntVector) buildBatch.getContainer()
-              .getLast();
-        }
-
-        // For every record in the build batch, hash the key columns and keep
-        // the result
-        for (int ind = 0; ind < currentRecordCount; ind++) {
-          int hashCode = spilledState.isFirstCycle()
-              ? partitions[0].getBuildHashCode(ind)
-              : read_right_HV_vector.getAccessor().get(ind); // get the hash
-                                                             // value from the
-                                                             // HV column
-          int currPart = hashCode & spilledState.getPartitionMask();
-          hashCode >>>= spilledState.getBitsInMask();
-          // semi-join skips join-key-duplicate rows
-          if (semiJoin) {
-
-          }
-          // Append the new inner row to the appropriate partition; spill (that
-          // partition) if needed
-          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
-              hashCode, buildCalc);
-        }
-
-        if (read_right_HV_vector != null) {
-          read_right_HV_vector.clear();
-          read_right_HV_vector = null;
-        }
-        break;
-      default:
-        throw new IllegalStateException(rightUpstream.name());
-      }
-      // Get the next incoming record batch
-      rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
-    }
-
-    if (spilledState.isFirstCycle() && enableRuntimeFilter) {
-      if (bloomFilter2buildId.size() > 0) {
-        int hashJoinOpId = this.popConfig.getOperatorId();
-        runtimeFilterReporter.sendOut(bloomFilters, probeFields,
-            this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
-      }
-    }
-
-    // Move the remaining current batches into their temp lists, or spill
-    // them if the partition is spilled. Add the spilled partitions into
-    // the spilled partitions list
-    if (numPartitions > 1) { // a single partition needs no completion
-      for (HashPartition partn : partitions) {
-        partn.completeAnInnerBatch(false, partn.isSpilled());
-      }
-    }
-
-    prefetchFirstProbeBatch();
-
-    if (leftUpstream.isError()) {
-      // A termination condition was reached while prefetching the first build
-      // side data holding batch.
-      // We need to terminate.
-      return leftUpstream;
-    }
-
-    HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc
-        .next();
-    postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
-
-    // Traverse all the in-memory partitions' incoming batches, and build their
-    // hash tables
-
-    for (int index = 0; index < partitions.length; index++) {
-      HashPartition partn = partitions[index];
-
-      if (partn.isSpilled()) {
-        // Don't build hash tables for spilled partitions
-        continue;
-      }
-
-      try {
-        if (postBuildCalc.shouldSpill()) {
-          // Spill this partition if we need to make room
-          partn.spillThisPartition();
-        } else {
-          // Only build hash tables for partitions that are not spilled
-          partn.buildContainersHashTableAndHelper();
-        }
-      } catch (OutOfMemoryException e) {
-        String message = "Failed building hash table on partition " + index
-            + ":\n" + makeDebugString() + "\n"
-            + postBuildCalc.makeDebugString();
-        // Include debug info
-        throw new OutOfMemoryException(message, e);
-      }
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug(postBuildCalc.makeDebugString());
-    }
-
-    for (HashPartition partn : partitions) {
-      if (partn.isSpilled()) {
-        HashJoinSpilledPartition sp = new HashJoinSpilledPartition(
-            spilledState.getCycle(), partn.getPartitionNum(), originalPartition,
-            partn.getPartitionBatchesCount(), partn.getSpillFile());
-
-        spilledState.addPartition(sp);
-        spilledInners[partn.getPartitionNum()] = sp; // for the outer to find
-                                                     // the SP later
-        partn.closeWriter();
-
-        partn.updateProbeRecordsPerBatch(
-            postBuildCalc.getProbeRecordsPerBatch());
-      }
-    }
-
-    return null;
-  }
-
-  private void setupOutputContainerSchema() {
-
-    if (buildSchema != null && !semiJoin) {
-      for (MaterializedField field : buildSchema) {
-        MajorType inputType = field.getType();
-        MajorType outputType;
-        // If left or full outer join, then the output type must be nullable.
-        // However, map types are
-        // not nullable so we must exclude them from the check below (see
-        // DRILL-2197).
-        if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
-            && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
-          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-        } else {
-          outputType = inputType;
-        }
-
-        // make sure to project field with children for children to show up in
-        // the schema
-        MaterializedField projected = field.withType(outputType);
-        // Add the vector to our output container
-        container.addOrGet(projected);
-      }
-    }
-
-    if (probeSchema != null) { // a probe schema was seen (even though the probe
-                               // may had no rows)
-      for (VectorWrapper<?> vv : probeBatch) {
-        MajorType inputType = vv.getField().getType();
-        MajorType outputType;
-
-        // If right or full outer join then the output type should be optional.
-        // However, map types are
-        // not nullable so we must exclude them from the check below (see
-        // DRILL-2771, DRILL-2197).
-        if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
-            && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
-          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-        } else {
-          outputType = inputType;
-        }
-
-        ValueVector v = container.addOrGet(
-            MaterializedField.create(vv.getField().getName(), outputType));
-        if (v instanceof AbstractContainerVector) {
-          vv.getValueVector().makeTransferPair(v);
-          v.clear();
-        }
-      }
-    }
-
+  @Override
+  public void dump() {
+    logger.error(
+        "HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={},"
+            + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]",
+        container, left, right, leftUpstream, rightUpstream, joinType,
+      probe, rightExpr, canSpill, buildSchema, probeSchema);
   }
 
-  // (After the inner side was read whole) - Has that inner partition spilled
-  public boolean isSpilledInner(int part) {
-    if (spilledInners == null) {
-      return false;
-    } // empty inner
-    return spilledInners[part] != null;
+  @Override // implement RowKeyJoin interface
+  public boolean hasRowKeyBatch() {
+    return buildComplete;
   }
 
-  /**
-   * The constructor
-   *
-   * @param popConfig
-   * @param context
-   * @param left
-   *          -- probe/outer side incoming input
-   * @param right
-   *          -- build/iner side incoming input
-   * @throws OutOfMemoryException
-   */
-  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
-      RecordBatch left, /* Probe side record batch */
-      RecordBatch right /* Build side record batch */
-  ) throws OutOfMemoryException {
-    super(popConfig, context, true, left, right);
-    this.buildBatch = right;
-    this.probeBatch = left;
-    joinType = popConfig.getJoinType();
-    semiJoin = popConfig.isSemiJoin();
-    joinIsLeftOrFull = joinType == JoinRelType.LEFT
-        || joinType == JoinRelType.FULL;
-    joinIsRightOrFull = joinType == JoinRelType.RIGHT
-        || joinType == JoinRelType.FULL;
-    conditions = popConfig.getConditions();
-    this.popConfig = popConfig;
-    this.isRowKeyJoin = popConfig.isRowKeyJoin();
-    this.joinControl = new JoinControl(popConfig.getJoinControl());
-
-    rightExpr = new ArrayList<>(conditions.size());
-    buildJoinColumns = Sets.newHashSet();
-    for (int i = 0; i < conditions.size(); i++) {
-      SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
-    }
-
-    for (int i = 0; i < conditions.size(); i++) {
-      SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
-      PathSegment.NameSegment nameSegment = (PathSegment.NameSegment) rightPath
-          .getLastSegment();
-      buildJoinColumns.add(nameSegment.getPath());
-      String refName = "build_side_" + i;
-      rightExpr.add(new NamedExpression(conditions.get(i).getRight(),
-          new FieldReference(refName)));
-    }
-
-    this.allocator = oContext.getAllocator();
-
-    numPartitions = (int) context.getOptions()
-        .getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
-    if (numPartitions == 1) { //
-      disableSpilling(
-          "Spilling is disabled due to configuration setting of num_partitions to 1");
-    }
-
-    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not
-                                                                 // a power of 2
-
-    long memLimit = context.getOptions()
-        .getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
-
-    if (memLimit != 0) {
-      allocator.setLimit(memLimit);
-    }
-
-    RECORDS_PER_BATCH = (int) context.getOptions()
-        .getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
-    maxBatchesInMemory = (int) context.getOptions()
-        .getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
-
-    logger.info("Memory limit {} bytes",
-        FileUtils.byteCountToDisplaySize(allocator.getLimit()));
-    spillSet = new SpillSet(context, popConfig);
-
-    // Create empty partitions (in the ctor - covers the case where right side
-    // is empty)
-    partitions = new HashPartition[0];
-
-    // get the output batch size from config.
-    int configuredBatchSize = (int) context.getOptions()
-        .getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    double avail_mem_factor = context.getOptions()
-        .getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
-    int outputBatchSize = Math.min(configuredBatchSize,
-        Integer.highestOneBit((int) (allocator.getLimit() * avail_mem_factor)));
-
-    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-        "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
-        configuredBatchSize, allocator.getLimit(), avail_mem_factor,
-        outputBatchSize);
-
-    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left,
-        right, new HashSet<>());
-
-    RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
-        configuredBatchSize);
-
-    enableRuntimeFilter = context.getOptions()
-        .getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER)
-        && popConfig.getRuntimeFilterDef() != null;
+  @Override // implement RowKeyJoin interface
+  public BatchState getBatchState() {
+    return state;
   }
 
-  /**
-   * This method is called when {@link HashJoinBatch} closes. It cleans up left
-   * over spilled files that are in the spill queue, and closes the spillSet.
-   */
-  private void cleanup() {
-    if (buildSideIsEmpty.booleanValue()) {
-      return;
-    } // not set up; nothing to clean
-    if (spillSet.getWriteBytes() > 0) {
-      stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
-          (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
-    }
-    // clean (and deallocate) each partition, and delete its spill file
-    for (HashPartition partn : partitions) {
-      partn.close();
-    }
-
-    // delete any spill file left in unread spilled partitions
-    while (!spilledState.isEmpty()) {
-      HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
-      try {
-        spillSet.delete(sp.innerSpillFile);
-      } catch (IOException e) {
-        logger.warn("Cleanup: Failed to delete spill file {}",
-            sp.innerSpillFile);
-      }
-      try { // outer file is added later; may be null if cleaning prematurely
-        if (sp.outerSpillFile != null) {
-          spillSet.delete(sp.outerSpillFile);
-        }
-      } catch (IOException e) {
-        logger.warn("Cleanup: Failed to delete spill file {}",
-            sp.outerSpillFile);
-      }
-    }
-    // Delete the currently handled (if any) spilled files
-    spillSet.close(); // delete the spill directory(ies)
+  @Override // implement RowKeyJoin interface
+  public void setBatchState(BatchState newState) {
+    state = newState;
   }
 
-  /**
-   * This creates a string that summarizes the memory usage of the operator.
-   *
-   * @return A memory dump string.
-   */
-  public String makeDebugString() {
-    StringBuilder sb = new StringBuilder();
-
-    for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
-      String partitionPrefix = "Partition " + partitionIndex + ": ";
-      HashPartition hashPartition = partitions[partitionIndex];
-      sb.append(partitionPrefix).append(hashPartition.makeDebugString())
-          .append("\n");
-    }
-
-    return sb.toString();
+  @Override
+  public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) {
+    this.rkJoinState = newState;
   }
 
-  /**
-   * Updates the {@link HashTable} and spilling stats after the original build
-   * side is processed.
-   *
-   * Note: this does not update all the stats. The cycleNum is updated
-   * dynamically in {@link #innerNext()} and the total bytes written is updated
-   * at close time in {@link #cleanup()}.
-   */
-  private void updateStats() {
-    if (buildSideIsEmpty.booleanValue()) {
-      return;
-    } // no stats when the right side is empty
-    if (!spilledState.isFirstCycle()) {
-      return;
-    } // These stats are only for before processing spilled files
-
-    HashTableStats htStats = new HashTableStats();
-    long numSpilled = 0;
-    HashTableStats newStats = new HashTableStats();
-    // sum the stats from all the partitions
-    for (HashPartition partn : partitions) {
-      if (partn.isSpilled()) {
-        numSpilled++;
-      }
-      partn.getStats(newStats);
-      htStats.addStats(newStats);
-    }
-
-    stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
-    stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
-    stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-    stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
-    stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in
-                                                                    // case no
-                                                                    // spill
-    stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+  @Override
+  public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() {
+    return rkJoinState;
   }
 
   /**
@@ -1508,10 +202,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
         return Pair.of(vv, pp.getRight());
       }
     } else if (partitions == null && firstOutputBatch) { // if there is data
-                                                         // coming to
-                                                         // right(build) side in
-                                                         // build Schema stage,
-                                                         // use it.
+      // coming to
+      // right(build) side in
+      // build Schema stage,
+      // use it.
       firstOutputBatch = false;
       if (right.getRecordCount() > 0) {
         VectorWrapper<?> vw = Iterables.get(right, 0);
@@ -1521,124 +215,4 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
     }
     return null;
   }
-
-  @Override // implement RowKeyJoin interface
-  public boolean hasRowKeyBatch() {
-    return buildComplete;
-  }
-
-  @Override // implement RowKeyJoin interface
-  public BatchState getBatchState() {
-    return state;
-  }
-
-  @Override // implement RowKeyJoin interface
-  public void setBatchState(BatchState newState) {
-    state = newState;
-  }
-
-  @Override
-  protected void cancelIncoming() {
-    wasKilled = true;
-    probeBatch.cancel();
-    buildBatch.cancel();
-  }
-
-  public void updateMetrics() {
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
-        batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
-        batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
-        batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
-        batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
-
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
-        batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
-        batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
-        batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
-        batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
-
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
-        batchMemoryManager.getNumOutgoingBatches());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
-        batchMemoryManager.getAvgOutputBatchSize());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
-        batchMemoryManager.getAvgOutputRowWidth());
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
-        batchMemoryManager.getTotalOutputRecords());
-  }
-
-  @Override
-  public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) {
-    this.rkJoinState = newState;
-  }
-
-  @Override
-  public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() {
-    return rkJoinState;
-  }
-
-  @Override
-  public void close() {
-    if (!spilledState.isFirstCycle()) { // spilling happened
-      // In case closing due to cancellation, BaseRootExec.close() does not
-      // close the open
-      // SpilledRecordBatch "scanners" as it only knows about the original
-      // left/right ops.
-      // TODO: Code that was here didn't actually close the "scanners"
-    }
-
-    updateMetrics();
-
-    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-        "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
-        batchMemoryManager
-            .getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager
-            .getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager
-            .getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager
-            .getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-        "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
-        batchMemoryManager
-            .getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager
-            .getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager
-            .getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager
-            .getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-        "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
-        batchMemoryManager.getNumOutgoingBatches(),
-        batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(),
-        batchMemoryManager.getTotalOutputRecords());
-
-    cleanup();
-    super.close();
-  }
-
-  public HashJoinProbe setupHashJoinProbe() {
-    // No real code generation !!
-    return new HashJoinProbeTemplate();
-  }
-
-  @Override
-  public void dump() {
-    logger.error(
-        "HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={},"
-            + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]",
-        container, left, right, leftUpstream, rightUpstream, joinType,
-        hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 79956a4df9..43e221a246 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -17,441 +17,105 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import java.util.ArrayList;
-
-import com.carrotsearch.hppc.IntArrayList;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.common.HashPartition;
 import org.apache.drill.exec.planner.common.JoinControl;
-import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-
-public class HashJoinProbeTemplate implements HashJoinProbe {
-
-  VectorContainer container; // the outgoing container
-
-  // Probe side record batch
-  private RecordBatch probeBatch;
-
-  private BatchSchema probeSchema;
 
+public class HashJoinProbeTemplate extends ProbeTemplate<HashJoinPOP> {
   // Join type, INNER, LEFT, RIGHT or OUTER
   private JoinRelType joinType;
-
   // joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL
   private JoinControl joinControl;
-
-  private HashJoinBatch outgoingJoinBatch;
-
-  // Number of records to process on the probe side
-  private int recordsToProcess;
-
-  // Number of records processed on the probe side
-  private int recordsProcessed;
-
-  // Number of records in the output container
-  private int outputRecords;
-
-  // Indicate if we should drain the next record from the probe side
-  private boolean getNextRecord = true;
-
-  // Contains both batch idx and record idx of the matching record in the build side
-  private int currentCompositeIdx = -1;
-
-  // Current state the hash join algorithm is in
-  private ProbeState probeState = ProbeState.PROBE_PROJECT;
-
-  // For outer or right joins, this is a list of unmatched records that needs to be projected
-  private IntArrayList unmatchedBuildIndexes;
-
-  private  HashPartition partitions[];
-
-  // While probing duplicates, retain current build-side partition in case need to continue
-  // probing later on the same chain of duplicates
-  private HashPartition currPartition;
-
-  private int currRightPartition; // for returning RIGHT/FULL
-  IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
-  private int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
-  private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
-  private boolean buildSideIsEmpty = true;
-  private int numPartitions = 1; // must be 2 to the power of bitsInMask
-  private int partitionMask; // numPartitions - 1
-  private int bitsInMask; // number of bits in the MASK
-  private int numberOfBuildSideColumns;
-  private int targetOutputRecords;
   private boolean semiJoin;
 
-  @Override
-  public void setTargetOutputCount(int targetOutputRecords) {
-    this.targetOutputRecords = targetOutputRecords;
-  }
-
-  @Override
-  public int getOutputCount() {
-    return outputRecords;
-  }
-
   /**
    *  Setup the Hash Join Probe object
-   *
-   * @param probeBatch
-   * @param outgoing
-   * @param joinRelType
-   * @param semiJoin
-   * @param leftStartState
-   * @param partitions
-   * @param cycleNum
-   * @param container
-   * @param spilledInners
-   * @param buildSideIsEmpty
-   * @param numPartitions
-   * @param rightHVColPosition
    */
   @Override
-  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
+  public void setup(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
                                  IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
-                                 VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
-                                 boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
-    this.container = container;
-    this.spilledInners = spilledInners;
-    this.probeBatch = probeBatch;
-    this.probeSchema = probeBatch.getSchema();
+                                 VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+                                 boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) throws SchemaChangeException {
+    super.setup(probeBatch, leftStartState, partitions, cycleNum, container, spilledInners,
+      buildSideIsEmpty, numPartitions);
+    this.outgoingBatch = outgoing;
     this.joinType = joinRelType;
-    this.outgoingJoinBatch = outgoing;
-    this.partitions = partitions;
-    this.cycleNum = cycleNum;
-    this.buildSideIsEmpty = buildSideIsEmpty;
-    this.numPartitions = numPartitions;
-    this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; // position (0 based) of added column == #columns
+    this.joinControl = new JoinControl(outgoing.getPopConfig().getJoinControl());
     this.semiJoin = semiJoin;
-
-    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
-    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
-    joinControl = new JoinControl(outgoingJoinBatch.getPopConfig().getJoinControl());
-
-    probeState = ProbeState.PROBE_PROJECT;
-    this.recordsToProcess = 0;
-    this.recordsProcessed = 0;
-
-    // A special case - if the left was an empty file
-    if (leftStartState == IterOutcome.NONE){
-      changeToFinalProbeState();
-    } else {
-      this.recordsToProcess = probeBatch.getRecordCount();
-    }
-
-    // for those outer partitions that need spilling (cause their matching inners spilled)
-    // initialize those partitions' current batches and hash-value vectors
-    for (HashPartition partn : this.partitions) {
-      partn.allocateNewCurrentBatchAndHV();
-    }
-
-    currRightPartition = 0; // In case it's a Right/Full outer join
-
-    // Initialize the HV vector for the first (already read) left batch
-    if (this.cycleNum > 0) {
-      if (read_left_HV_vector != null) { read_left_HV_vector.clear();}
-      if (leftStartState != IterOutcome.NONE) { // Skip when outer spill was empty
-        read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast();
-      }
-    }
-  }
-
-  /**
-   * Append the given build side row into the outgoing container
-   * @param buildSrcContainer The container for the right/inner side
-   * @param buildSrcIndex build side index
-   */
-  private void appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) {
-    for (int vectorIndex = 0; vectorIndex < numberOfBuildSideColumns; vectorIndex++) {
-      ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
-      ValueVector srcVector = buildSrcContainer.getValueVector(vectorIndex).getValueVector();
-      destVector.copyEntry(container.getRecordCount(), srcVector, buildSrcIndex);
-    }
-  }
-
-  /**
-   * Append the given probe side row into the outgoing container, following the build side part
-   * @param probeSrcContainer The container for the left/outer side
-   * @param probeSrcIndex probe side index
-   */
-  private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex) {
-    for (int vectorIndex = numberOfBuildSideColumns; vectorIndex < container.getNumberOfColumns(); vectorIndex++) {
-      ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
-      ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex - numberOfBuildSideColumns).getValueVector();
-      destVector.copyEntry(container.getRecordCount(), srcVector, probeSrcIndex);
-    }
-  }
-
-  /**
-   *  A special version of the VectorContainer's appendRow for the HashJoin; (following a probe) it
-   *  copies the build and probe sides into the outgoing container. (It uses a composite
-   *  index for the build side). If any of the build/probe source containers is null, then that side
-   *  is not appended (effectively outputing nulls for that side's columns).
-   * @param buildSrcContainers The containers list for the right/inner side
-   * @param compositeBuildSrcIndex Composite build index
-   * @param probeSrcContainer The single container for the left/outer side
-   * @param probeSrcIndex Index in the outer container
-   * @return Number of rows in this container (after the append)
-   */
-  private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
-                        VectorContainer probeSrcContainer, int probeSrcIndex) {
-
-    if (buildSrcContainers != null) {
-      int buildBatchIndex = compositeBuildSrcIndex >>> 16;
-      int buildOffset = compositeBuildSrcIndex & 65535;
-      appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset);
-    }
-    if (probeSrcContainer != null) {
-      appendProbe(probeSrcContainer, probeSrcIndex);
-    }
-    return container.incRecordCount();
+    this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; // position (0 based) of added column == #columns
   }
 
-  /**
-   * After the "inner" probe phase, finish up a Right (of Full) Join by projecting the unmatched rows of the build side
-   * @param currBuildPart Which partition
-   */
-  private void executeProjectRightPhase(int currBuildPart) {
-    while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) {
-      outputRecords =
-        outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
-          null /* no probeBatch */, 0 /* no probe index */);
+  @Override
+  protected void handleProbeResult(int probeIndex) {
+    if (semiJoin) {
+      if (probeIndex != -1) {
+        // output the probe side only
+        outputRecords =
+          outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+      }
       recordsProcessed++;
+      return; // no build-side duplicates, go on to the next probe-side row
     }
-  }
-
-  private void executeProbePhase() throws SchemaChangeException {
-
-    while (outputRecords < targetOutputRecords && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
-
-      // Check if we have processed all records in this batch we need to invoke next
-      if (recordsProcessed == recordsToProcess) {
-
-        // Done processing all records in the previous batch, clean up!
-        for (VectorWrapper<?> wrapper : probeBatch) {
-          wrapper.getValueVector().clear();
-        }
-
-        IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
-        switch (leftUpstream) {
-          case NONE:
-          case NOT_YET:
-            recordsProcessed = 0;
-            recordsToProcess = 0;
-            changeToFinalProbeState();
-            // in case some outer partitions were spilled, need to spill their last batches
-            for (HashPartition partn : partitions) {
-              if (! partn.isSpilled()) { continue; } // skip non-spilled
-              partn.completeAnOuterBatch(false);
-              // update the partition's spill record with the outer side
-              HashJoinBatch.HashJoinSpilledPartition sp = spilledInners[partn.getPartitionNum()];
-              sp.updateOuter(partn.getPartitionBatchesCount(), partn.getSpillFile());
 
-              partn.closeWriter();
-            }
+    if (probeIndex != -1) {
+      /* The current probe record has a key that matches. Get the index
+       * of the first row in the build side that matches the current key
+       * (and record this match in the bitmap, in case of a FULL/RIGHT join)
+       */
+      Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex);
 
-            continue;
+      boolean matchExists = matchStatus.getRight();
 
-          case OK_NEW_SCHEMA:
-            if (probeBatch.getSchema().equals(probeSchema)) {
-              for (HashPartition partn : partitions) { partn.updateBatches(); }
-
-            } else {
-              throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
-                probeSchema,
-                probeBatch.getSchema());
-            }
-          case OK:
-            outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
-            setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
-            recordsToProcess = probeBatch.getRecordCount();
-            recordsProcessed = 0;
-            // If we received an empty batch do nothing
-            if (recordsToProcess == 0) {
-              continue;
-            }
-            if (cycleNum > 0) {
-              read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
-            }
-            break;
-          default:
-        }
+      if (joinControl.isIntersectDistinct() && matchExists) {
+        // since it is intersect distinct and we already have one record matched, move to next probe row
+        recordsProcessed++;
+        return;
       }
 
-      int probeIndex = -1;
-      // Check if we need to drain the next row in the probe side
-      if (getNextRecord) {
-        if (!buildSideIsEmpty) {
-          int hashCode = (cycleNum == 0) ?
-            partitions[0].getProbeHashCode(recordsProcessed)
-            : read_left_HV_vector.getAccessor().get(recordsProcessed);
-          int currBuildPart = hashCode & partitionMask;
-          hashCode >>>= bitsInMask;
-
-          // Set and keep the current partition (may be used again on subsequent probe calls as
-          // inner rows of duplicate key are processed)
-          currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
-
-          // If the matching inner partition was spilled
-          if (outgoingJoinBatch.isSpilledInner(currBuildPart)) {
-            // add this row to its outer partition (may cause a spill, when the batch is full)
-
-            currPartition.appendOuterRow(hashCode, recordsProcessed);
-
-            recordsProcessed++; // done with this outer record
-            continue; // on to the next outer record
-          }
-
-          probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
-
-        }
-
-        if (semiJoin) {
-          if (probeIndex != -1) {
-            // output the probe side only
-            outputRecords =
-              outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
-          }
-          recordsProcessed++;
-          continue; // no build-side duplicates, go on to the next probe-side row
-        }
-
-        if (probeIndex != -1) {
-
-          /* The current probe record has a key that matches. Get the index
-           * of the first row in the build side that matches the current key
-           * (and record this match in the bitmap, in case of a FULL/RIGHT join)
-           */
-          Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex);
-
-          boolean matchExists = matchStatus.getRight();
-
-          if (joinControl.isIntersectDistinct() && matchExists) {
-            // since it is intersect distinct and we already have one record matched, move to next probe row
-            recordsProcessed++;
-            continue;
-          }
-
-          currentCompositeIdx = matchStatus.getLeft();
-
-          outputRecords =
-            outputRow(currPartition.getContainers(), currentCompositeIdx,
-              probeBatch.getContainer(), recordsProcessed);
+      currentCompositeIdx = matchStatus.getLeft();
 
-          /* Projected single row from the build side with matching key but there
-           * may be more rows with the same key. Check if that's the case as long as
-           * we are not doing intersect distinct since it only cares about
-           * distinct values.
-           */
-          currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 :
-            currPartition.getNextIndex(currentCompositeIdx);
-
-          if (currentCompositeIdx == -1) {
-            /* We only had one row in the build side that matched the current key
-             * from the probe side. Drain the next row in the probe side.
-             */
-            recordsProcessed++;
-          } else {
-            /* There is more than one row with the same key on the build side
-             * don't drain more records from the probe side till we have projected
-             * all the rows with this key
-             */
-            getNextRecord = false;
-          }
-        } else { // No matching key
-
-          // If we have a left outer join, project the outer side
-          if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-
-            outputRecords = // output only the probe side (the build side would be all nulls)
-              outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
-          }
-          recordsProcessed++;
-        }
+      outputRecords =
+        outputRow(currPartition.getContainers(), currentCompositeIdx,
+          probeBatch.getContainer(), recordsProcessed);
+
+      /* Projected single row from the build side with matching key but there
+       * may be more rows with the same key. Check if that's the case as long as
+       * we are not doing intersect distinct since it only cares about
+       * distinct values.
+       */
+      currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 :
+        currPartition.getNextIndex(currentCompositeIdx);
+
+      if (currentCompositeIdx == -1) {
+        /* We only had one row in the build side that matched the current key
+         * from the probe side. Drain the next row in the probe side.
+         */
+        recordsProcessed++;
+      } else {
+        /* There is more than one row with the same key on the build side
+         * don't drain more records from the probe side till we have projected
+         * all the rows with this key
+         */
+        getNextRecord = false;
       }
-      else { // match the next inner row with the same key
-
-        currPartition.setRecordMatched(currentCompositeIdx);
-
-        outputRecords =
-          outputRow(currPartition.getContainers(), currentCompositeIdx,
-            probeBatch.getContainer(), recordsProcessed);
+    } else { // No matching key
 
-        currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+      // If we have a left outer join, project the outer side
+      if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
 
-        if (currentCompositeIdx == -1) {
-          // We don't have any more rows matching the current key on the build side, move on to the next probe row
-          getNextRecord = true;
-          recordsProcessed++;
-        }
+        outputRecords = // output only the probe side (the build side would be all nulls)
+          outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
       }
+      recordsProcessed++;
     }
   }
 
-  /**
-   *  Perform the probe, till the outgoing is full, or no more rows to probe.
-   *  Performs the inner or left-outer join while there are left rows,
-   *  when done, continue with right-outer, if appropriate.
-   * @return Num of output records
-   * @throws SchemaChangeException
-   */
-  @Override
-  public int probeAndProject() throws SchemaChangeException {
-
-    outputRecords = 0;
-
-    // When handling spilled partitions, the state becomes DONE at the end of each partition
-    if (probeState == ProbeState.DONE) {
-      return outputRecords; // that is zero
-    }
-
-    if (probeState == ProbeState.PROBE_PROJECT) {
-      executeProbePhase();
-    }
-
-    if (probeState == ProbeState.PROJECT_RIGHT) {
-      // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
-
-      do {
-
-        if (unmatchedBuildIndexes == null) { // first time for this partition ?
-          if (buildSideIsEmpty) { return outputRecords; } // in case of an empty right
-          // Get this partition's list of build indexes that didn't match any record on the probe side
-          unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
-          recordsProcessed = 0;
-          recordsToProcess = unmatchedBuildIndexes.size();
-        }
-
-        // Project the list of unmatched records on the build side
-        executeProjectRightPhase(currRightPartition);
-
-        if (recordsProcessed < recordsToProcess) { // more records in this partition?
-          return outputRecords;  // outgoing is full; report and come back later
-        } else {
-          currRightPartition++; // on to the next right partition
-          unmatchedBuildIndexes = null;
-        }
-
-      }   while (currRightPartition < numPartitions);
-
-      probeState = ProbeState.DONE; // last right partition was handled; we are done now
-    }
-
-    return outputRecords;
-  }
-
   @Override
   public void changeToFinalProbeState() {
     // We are done with the (left) probe phase.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/Probe.java
similarity index 64%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/Probe.java
index 490eba4e33..b5acb89177 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/Probe.java
@@ -17,16 +17,15 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.setop.HashSetOpRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.exec.record.VectorContainer;
 
-public interface HashJoinProbe {
-  TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
-
+public interface Probe {
   /* The probe side of the hash join can be in the following two states
    * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
    *    key match and if we do we project the record
@@ -39,11 +38,21 @@ public interface HashJoinProbe {
     PROBE_PROJECT, PROJECT_RIGHT, DONE
   }
 
-  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
-                          RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
-                          VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
-                          boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition);
-  int  probeAndProject() throws SchemaChangeException;
+  default void setup(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
+    RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+    VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+    boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) throws SchemaChangeException {
+    throw new UnsupportedOperationException();
+  }
+
+  default void setup(RecordBatch probeBatch, HashSetOpRecordBatch outgoing, SqlKind opType, boolean isAll,
+    RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+    VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+    boolean buildSideIsEmpty, int numPartitions) throws SchemaChangeException {
+    throw new UnsupportedOperationException();
+  }
+
+  int probeAndProject() throws SchemaChangeException;
   void changeToFinalProbeState();
   void setTargetOutputCount(int targetOutputCount);
   int getOutputCount();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/ProbeTemplate.java
similarity index 63%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/ProbeTemplate.java
index 79956a4df9..21101b5b51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/ProbeTemplate.java
@@ -17,127 +17,84 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import java.util.ArrayList;
-
 import com.carrotsearch.hppc.IntArrayList;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.common.HashPartition;
-import org.apache.drill.exec.planner.common.JoinControl;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.ArrayList;
+
 import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
 
-public class HashJoinProbeTemplate implements HashJoinProbe {
+public abstract class ProbeTemplate<T extends PhysicalOperator> implements Probe {
 
-  VectorContainer container; // the outgoing container
+  protected VectorContainer container; // the outgoing container
 
   // Probe side record batch
-  private RecordBatch probeBatch;
+  protected RecordBatch probeBatch;
 
-  private BatchSchema probeSchema;
-
-  // Join type, INNER, LEFT, RIGHT or OUTER
-  private JoinRelType joinType;
-
-  // joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL
-  private JoinControl joinControl;
-
-  private HashJoinBatch outgoingJoinBatch;
+  protected BatchSchema probeSchema;
 
   // Number of records to process on the probe side
-  private int recordsToProcess;
+  protected int recordsToProcess;
 
   // Number of records processed on the probe side
-  private int recordsProcessed;
+  protected int recordsProcessed;
 
   // Number of records in the output container
-  private int outputRecords;
+  protected int outputRecords;
 
   // Indicate if we should drain the next record from the probe side
-  private boolean getNextRecord = true;
+  protected boolean getNextRecord = true;
 
   // Contains both batch idx and record idx of the matching record in the build side
-  private int currentCompositeIdx = -1;
+  protected int currentCompositeIdx = -1;
 
   // Current state the hash join algorithm is in
-  private ProbeState probeState = ProbeState.PROBE_PROJECT;
+  protected ProbeState probeState = ProbeState.PROBE_PROJECT;
 
   // For outer or right joins, this is a list of unmatched records that needs to be projected
-  private IntArrayList unmatchedBuildIndexes;
+  protected IntArrayList unmatchedBuildIndexes;
 
-  private  HashPartition partitions[];
+  protected HashPartition[] partitions;
 
   // While probing duplicates, retain current build-side partition in case need to continue
   // probing later on the same chain of duplicates
-  private HashPartition currPartition;
-
-  private int currRightPartition; // for returning RIGHT/FULL
+  protected HashPartition currPartition;
+  protected int currRightPartition; // for returning RIGHT/FULL
   IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
-  private int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
-  private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
-  private boolean buildSideIsEmpty = true;
-  private int numPartitions = 1; // must be 2 to the power of bitsInMask
-  private int partitionMask; // numPartitions - 1
-  private int bitsInMask; // number of bits in the MASK
-  private int numberOfBuildSideColumns;
-  private int targetOutputRecords;
-  private boolean semiJoin;
-
-  @Override
-  public void setTargetOutputCount(int targetOutputRecords) {
-    this.targetOutputRecords = targetOutputRecords;
-  }
-
-  @Override
-  public int getOutputCount() {
-    return outputRecords;
-  }
-
-  /**
-   *  Setup the Hash Join Probe object
-   *
-   * @param probeBatch
-   * @param outgoing
-   * @param joinRelType
-   * @param semiJoin
-   * @param leftStartState
-   * @param partitions
-   * @param cycleNum
-   * @param container
-   * @param spilledInners
-   * @param buildSideIsEmpty
-   * @param numPartitions
-   * @param rightHVColPosition
-   */
-  @Override
-  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
-                                 IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
-                                 VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
-                                 boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
+  protected int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
+  protected AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners; // for the outer to find the partition
+  protected boolean buildSideIsEmpty = true;
+  protected int numPartitions = 1; // must be 2 to the power of bitsInMask
+  protected int partitionMask; // numPartitions - 1
+  protected int bitsInMask; // number of bits in the MASK
+  protected int numberOfBuildSideColumns;
+  protected int targetOutputRecords;
+  protected AbstractHashBinaryRecordBatch<T> outgoingBatch;
+
+  protected void setup(RecordBatch probeBatch, IterOutcome leftStartState,
+    HashPartition[] partitions, int cycleNum,
+    VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+    boolean buildSideIsEmpty, int numPartitions) throws SchemaChangeException {
     this.container = container;
     this.spilledInners = spilledInners;
     this.probeBatch = probeBatch;
     this.probeSchema = probeBatch.getSchema();
-    this.joinType = joinRelType;
-    this.outgoingJoinBatch = outgoing;
     this.partitions = partitions;
     this.cycleNum = cycleNum;
     this.buildSideIsEmpty = buildSideIsEmpty;
     this.numPartitions = numPartitions;
-    this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; // position (0 based) of added column == #columns
-    this.semiJoin = semiJoin;
 
     partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
     bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
-    joinControl = new JoinControl(outgoingJoinBatch.getPopConfig().getJoinControl());
 
     probeState = ProbeState.PROBE_PROJECT;
     this.recordsToProcess = 0;
@@ -156,6 +113,13 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
       partn.allocateNewCurrentBatchAndHV();
     }
 
+    // Container is cleared after prefetchFirstProbeBatch in case of AggRecordBatch,
+    // only the partition with batch in it will update the reference to vv in the hashtable after that.
+    // Update the first partition here if not yet, it will be used when get hash code.
+    if (this.cycleNum == 0 && partitions.length > 0 && partitions[0].getPartitionBatchesCount() == 0) {
+      partitions[0].updateBatches();
+    }
+
     currRightPartition = 0; // In case it's a Right/Full outer join
 
     // Initialize the HV vector for the first (already read) left batch
@@ -167,6 +131,16 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
     }
   }
 
+  @Override
+  public void setTargetOutputCount(int targetOutputRecords) {
+    this.targetOutputRecords = targetOutputRecords;
+  }
+
+  @Override
+  public int getOutputCount() {
+    return outputRecords;
+  }
+
   /**
    * Append the given build side row into the outgoing container
    * @param buildSrcContainer The container for the right/inner side
@@ -204,7 +178,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
    * @param probeSrcIndex Index in the outer container
    * @return Number of rows in this container (after the append)
    */
-  private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
+  protected int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
                         VectorContainer probeSrcContainer, int probeSrcIndex) {
 
     if (buildSrcContainers != null) {
@@ -222,7 +196,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
    * After the "inner" probe phase, finish up a Right (of Full) Join by projecting the unmatched rows of the build side
    * @param currBuildPart Which partition
    */
-  private void executeProjectRightPhase(int currBuildPart) {
+  protected void executeProjectRightPhase(int currBuildPart) {
     while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) {
       outputRecords =
         outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
@@ -243,7 +217,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
           wrapper.getValueVector().clear();
         }
 
-        IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
+        IterOutcome leftUpstream = outgoingBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
 
         switch (leftUpstream) {
           case NONE:
@@ -256,7 +230,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
               if (! partn.isSpilled()) { continue; } // skip non-spilled
               partn.completeAnOuterBatch(false);
               // update the partition's spill record with the outer side
-              HashJoinBatch.HashJoinSpilledPartition sp = spilledInners[partn.getPartitionNum()];
+              AbstractHashBinaryRecordBatch.SpilledPartition sp = spilledInners[partn.getPartitionNum()];
               sp.updateOuter(partn.getPartitionBatchesCount(), partn.getSpillFile());
 
               partn.closeWriter();
@@ -269,13 +243,13 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
               for (HashPartition partn : partitions) { partn.updateBatches(); }
 
             } else {
-              throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
-                probeSchema,
-                probeBatch.getSchema());
+              throw SchemaChangeException.schemaChanged(
+                this.getClass().getSimpleName() + " does not support schema changes in probe side.",
+                probeSchema, probeBatch.getSchema());
             }
           case OK:
-            outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
-            setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
+            outgoingBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
+            setTargetOutputCount(outgoingBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
             recordsToProcess = probeBatch.getRecordCount();
             recordsProcessed = 0;
             // If we received an empty batch do nothing
@@ -305,7 +279,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
           currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
 
           // If the matching inner partition was spilled
-          if (outgoingJoinBatch.isSpilledInner(currBuildPart)) {
+          if (outgoingBatch.isSpilledInner(currBuildPart)) {
             // add this row to its outer partition (may cause a spill, when the batch is full)
 
             currPartition.appendOuterRow(hashCode, recordsProcessed);
@@ -318,70 +292,8 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
 
         }
 
-        if (semiJoin) {
-          if (probeIndex != -1) {
-            // output the probe side only
-            outputRecords =
-              outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
-          }
-          recordsProcessed++;
-          continue; // no build-side duplicates, go on to the next probe-side row
-        }
-
-        if (probeIndex != -1) {
-
-          /* The current probe record has a key that matches. Get the index
-           * of the first row in the build side that matches the current key
-           * (and record this match in the bitmap, in case of a FULL/RIGHT join)
-           */
-          Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex);
-
-          boolean matchExists = matchStatus.getRight();
-
-          if (joinControl.isIntersectDistinct() && matchExists) {
-            // since it is intersect distinct and we already have one record matched, move to next probe row
-            recordsProcessed++;
-            continue;
-          }
-
-          currentCompositeIdx = matchStatus.getLeft();
-
-          outputRecords =
-            outputRow(currPartition.getContainers(), currentCompositeIdx,
-              probeBatch.getContainer(), recordsProcessed);
-
-          /* Projected single row from the build side with matching key but there
-           * may be more rows with the same key. Check if that's the case as long as
-           * we are not doing intersect distinct since it only cares about
-           * distinct values.
-           */
-          currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 :
-            currPartition.getNextIndex(currentCompositeIdx);
-
-          if (currentCompositeIdx == -1) {
-            /* We only had one row in the build side that matched the current key
-             * from the probe side. Drain the next row in the probe side.
-             */
-            recordsProcessed++;
-          } else {
-            /* There is more than one row with the same key on the build side
-             * don't drain more records from the probe side till we have projected
-             * all the rows with this key
-             */
-            getNextRecord = false;
-          }
-        } else { // No matching key
-
-          // If we have a left outer join, project the outer side
-          if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-
-            outputRecords = // output only the probe side (the build side would be all nulls)
-              outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
-          }
-          recordsProcessed++;
-        }
-      }
-      else { // match the next inner row with the same key
+        handleProbeResult(probeIndex);
+      } else { // match the next inner row with the same key
 
         currPartition.setRecordMatched(currentCompositeIdx);
 
@@ -405,7 +317,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
    *  Performs the inner or left-outer join while there are left rows,
    *  when done, continue with right-outer, if appropriate.
    * @return Num of output records
-   * @throws SchemaChangeException
+   * @throws SchemaChangeException SchemaChangeException
    */
   @Override
   public int probeAndProject() throws SchemaChangeException {
@@ -423,9 +335,7 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
 
     if (probeState == ProbeState.PROJECT_RIGHT) {
       // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
-
       do {
-
         if (unmatchedBuildIndexes == null) { // first time for this partition ?
           if (buildSideIsEmpty) { return outputRecords; } // in case of an empty right
           // Get this partition's list of build indexes that didn't match any record on the probe side
@@ -444,33 +354,12 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
           unmatchedBuildIndexes = null;
         }
 
-      }   while (currRightPartition < numPartitions);
+      } while (currRightPartition < numPartitions);
 
       probeState = ProbeState.DONE; // last right partition was handled; we are done now
     }
-
     return outputRecords;
   }
 
-  @Override
-  public void changeToFinalProbeState() {
-    // We are done with the (left) probe phase.
-    // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side
-    probeState =
-      (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
-        ProbeState.DONE; // else we're done
-  }
-
-  @Override
-  public String toString() {
-    return "HashJoinProbeTemplate[container=" + container
-        + ", probeSchema=" + probeSchema
-        + ", joinType=" + joinType
-        + ", recordsToProcess=" + recordsToProcess
-        + ", recordsProcessed=" + recordsProcessed
-        + ", outputRecords=" + outputRecords
-        + ", probeState=" + probeState
-        + ", unmatchedBuildIndexes=" + unmatchedBuildIndexes
-        + "]";
-  }
+  protected abstract void handleProbeResult(int probeIndex);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpProbeTemplate.java
new file mode 100644
index 0000000000..19e9c0daed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpProbeTemplate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.setop;
+
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.config.SetOp;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.join.AbstractHashBinaryRecordBatch;
+import org.apache.drill.exec.physical.impl.join.ProbeTemplate;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorContainer;
+
+public class HashSetOpProbeTemplate extends ProbeTemplate<SetOp> {
+  private SqlKind opType;
+  private boolean isAll;
+
+  /**
+   *  Setup the Hash Set Op Probe object
+   */
+  @Override
+  public void setup(RecordBatch probeBatch, HashSetOpRecordBatch outgoing, SqlKind opType, boolean isAll,
+                                 IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+                                 VectorContainer container, AbstractHashBinaryRecordBatch.SpilledPartition[] spilledInners,
+                                 boolean buildSideIsEmpty, int numPartitions) throws SchemaChangeException {
+    super.setup(probeBatch, leftStartState, partitions, cycleNum, container, spilledInners,
+      buildSideIsEmpty, numPartitions);
+    this.outgoingBatch = outgoing;
+    this.opType = opType;
+    this.isAll = isAll;
+    this.numberOfBuildSideColumns = 0;
+  }
+
+  @Override
+  protected void handleProbeResult(int probeIndex) {
+    switch (opType) {
+      case INTERSECT:
+        if (probeIndex != -1 && currPartition.getRecordNumForKey(probeIndex) > 0) {
+          if (isAll) {
+            currPartition.decreaseRecordNumForKey(probeIndex);
+          } else {
+            currPartition.setRecordNumForKey(probeIndex, 0);
+          }
+          outputRecords =
+            outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+        }
+        break;
+      case EXCEPT:
+        if (isAll) {
+          if (probeIndex == -1 || currPartition.getRecordNumForKey(probeIndex) == 0) {
+            outputRecords =
+              outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+          } else {
+            currPartition.decreaseRecordNumForKey(probeIndex);
+          }
+        } else if (probeIndex == -1) {
+          outputRecords =
+            outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+        }
+      default:
+        break;
+    }
+    recordsProcessed++;
+  }
+
+  @Override
+  public void changeToFinalProbeState() {
+    probeState = ProbeState.DONE;
+  }
+
+  @Override
+  public String toString() {
+    return "HashSetOpProbeTemplate[container=" + container
+        + ", probeSchema=" + probeSchema
+        + ", opType=" + opType
+        + ", isAll=" + isAll
+        + ", recordsToProcess=" + recordsToProcess
+        + ", recordsProcessed=" + recordsProcessed
+        + ", outputRecords=" + outputRecords
+        + ", probeState=" + probeState
+        + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpRecordBatch.java
new file mode 100644
index 0000000000..516788d76f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/HashSetOpRecordBatch.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.setop;
+
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.SetOp;
+import org.apache.drill.exec.physical.impl.common.Comparator;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.join.AbstractHashBinaryRecordBatch;
+import org.apache.drill.exec.physical.impl.join.Probe;
+import org.apache.drill.exec.planner.common.JoinControl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implements the runtime execution for the Hash-SetOp operator supporting EXCEPT,
+ * EXCEPT ALL, INTERSECT, and INTERSECT ALL
+ */
+public class HashSetOpRecordBatch extends AbstractHashBinaryRecordBatch<SetOp> {
+  private final SqlKind opType;
+  private final boolean isAll;
+
+  /**
+   * The constructor
+   *
+   * @param popConfig SetOp
+   * @param context FragmentContext
+   * @param left probe/outer side incoming input
+   * @param right build/inner side incoming input
+   * @throws OutOfMemoryException out of memory exception
+   */
+  public HashSetOpRecordBatch(SetOp popConfig, FragmentContext context,
+                              RecordBatch left, RecordBatch right)
+    throws OutOfMemoryException {
+    super(popConfig, context, left, right);
+    this.opType = popConfig.getKind();
+    this.isAll = popConfig.isAll();
+    this.semiJoin = true;
+    this.joinIsLeftOrFull = true;
+    this.joinIsRightOrFull = false;
+    this.isRowKeyJoin = false;
+    this.enableRuntimeFilter = false;
+  }
+
+  private void buildCompareExpression(List<NamedExpression> leftExpr, List<Comparator> comparators) {
+    Iterator<MaterializedField>  iterator = probeSchema.iterator();
+    int i = 0;
+    while (iterator.hasNext()) {
+      MaterializedField field = iterator.next();
+      String refName = "probe_side_" + i;
+      leftExpr.add(new NamedExpression(new FieldReference(field.getName()), new FieldReference(refName)));
+      i++;
+    }
+
+    iterator = buildSchema.iterator();
+    i = 0;
+    while (iterator.hasNext()) {
+      MaterializedField field = iterator.next();
+      String refName = "build_side_" + i;
+      rightExpr.add(new NamedExpression(new FieldReference(field.getName()), new FieldReference(refName)));
+      buildJoinColumns.add(field.getName());
+      i++;
+    }
+    leftExpr.forEach(e->comparators.add(Comparator.EQUALS));
+  }
+
+  @Override
+  protected HashTableConfig buildHashTableConfig() {
+    if (leftUpstream == IterOutcome.OK_NEW_SCHEMA
+      || leftUpstream == IterOutcome.OK) {
+      if (probeBatch.getSchema()
+        .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+        throw UserException.internalError(null).message(
+            "Hash SetOp does not support probe batch with selection vectors.")
+          .addContext("Probe batch has selection mode",
+            (probeBatch.getSchema().getSelectionVectorMode()).toString())
+          .build(logger);
+      }
+    }
+
+    List<NamedExpression> leftExpr = Lists.newArrayListWithExpectedSize(probeSchema.getFieldCount());
+    List<Comparator> comparators = Lists.newArrayListWithExpectedSize(probeSchema.getFieldCount());
+    buildCompareExpression(leftExpr, comparators);
+    return new HashTableConfig(
+      (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+      true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
+      JoinControl.DEFAULT, true);
+  }
+
+  @Override
+  public Probe createProbe() {
+    // No real code generation !!
+    return new HashSetOpProbeTemplate();
+  }
+
+  @Override
+  public void setupProbe() throws SchemaChangeException {
+    probe.setup(probeBatch, this, opType,
+      isAll, leftUpstream, partitions, spilledState.getCycle(),
+      container, spilledInners, buildSideIsEmpty.booleanValue(),
+      numPartitions);
+  }
+
+  @Override
+  public void dump() {
+    logger.error(
+      "HashSetOpRecordBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, " +
+        "SetOpType={}, IsAll={}, hashSetOpProbe={}, canSpill={}, buildSchema={}, probeSchema={}]",
+      container, left, right, leftUpstream, rightUpstream, opType, isAll, probe, canSpill, buildSchema, probeSchema);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/SetOpBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/SetOpBatchCreator.java
new file mode 100644
index 0000000000..58338eb60e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/setop/SetOpBatchCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.setop;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.SetOp;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+
+public class SetOpBatchCreator implements BatchCreator<SetOp> {
+  @Override
+  public HashSetOpRecordBatch getBatch(ExecutorFragmentContext context, SetOp config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 2);
+    return new HashSetOpRecordBatch(config, context, children.get(0), children.get(1));
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 00ff1fc704..2c2215eacd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -19,12 +19,14 @@ package org.apache.drill.exec.planner;
 
 import org.apache.drill.exec.planner.logical.ConvertMetadataAggregateToDirectScanRule;
 import org.apache.drill.exec.planner.logical.DrillDistinctJoinToSemiJoinRule;
+import org.apache.drill.exec.planner.logical.DrillSetOpRule;
 import org.apache.drill.exec.planner.logical.DrillReduceExpressionsRule;
 import org.apache.drill.exec.planner.logical.DrillTableModifyRule;
 import org.apache.drill.exec.planner.physical.MetadataAggPrule;
 import org.apache.drill.exec.planner.physical.MetadataControllerPrule;
 import org.apache.drill.exec.planner.physical.MetadataHandlerPrule;
 import org.apache.drill.exec.planner.physical.TableModifyPrule;
+import org.apache.drill.exec.planner.physical.SetOpPrule;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -396,6 +398,7 @@ public enum PlannerPhase {
      */
     ImmutableSet.Builder<RelOptRule> basicRules = ImmutableSet.<RelOptRule>builder()
         .addAll(staticRuleSet)
+        .addAll(DrillSetOpRule.INSTANCES)
         .add(
             DrillMergeProjectRule.getInstance(true, RelFactories.DEFAULT_PROJECT_FACTORY,
                 optimizerRulesContext.getFunctionRegistry())
@@ -551,6 +554,9 @@ public enum PlannerPhase {
     ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
     ruleList.add(TableModifyPrule.INSTANCE);
 
+    ruleList.addAll(SetOpPrule.DIST_INSTANCES);
+    ruleList.addAll(SetOpPrule.BROADCAST_INSTANCES);
+
     if (ps.isHashAggEnabled()) {
       ruleList.add(HashAggPrule.INSTANCE);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSetOpRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSetOpRel.java
new file mode 100644
index 0000000000..29b87b291e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSetOpRel.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+public interface DrillSetOpRel {
+  default boolean isCompatible(RelDataType setOpType, List<RelNode> inputs) {
+    for (RelNode input : inputs) {
+      if (!DrillRelOptUtil.areRowTypesCompatible(
+        input.getRowType(), setOpType, false, true)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
deleted file mode 100644
index 968cf54eec..0000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.common;
-
-import java.util.List;
-
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Union;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.type.RelDataType;
-
-/**
- * Base class for logical and physical Union implemented in Drill
- */
-public abstract class DrillUnionRelBase extends Union implements DrillRelNode {
-
-  public DrillUnionRelBase(RelOptCluster cluster, RelTraitSet traits,
-      List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, all);
-    if (checkCompatibility &&
-        !this.isCompatible(false /* don't compare names */, true /* allow substrings */)) {
-      throw new InvalidRelException("Input row types of the Union are not compatible.");
-    }
-  }
-
-  public boolean isCompatible(boolean compareNames, boolean allowSubstring) {
-    RelDataType unionType = getRowType();
-    for (RelNode input : getInputs()) {
-      if (! DrillRelOptUtil.areRowTypesCompatible(
-          input.getRowType(), unionType, compareNames, allowSubstring)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public boolean isDistinct() {
-    return !this.all;
-  }
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillExceptRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillExceptRel.java
new file mode 100644
index 0000000000..38173bf4d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillExceptRel.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Minus implemented in Drill.
+ */
+public class DrillExceptRel extends Minus implements DrillRel, DrillSetOpRel {
+  private static final Logger tracer = CalciteTrace.getPlannerTracer();
+
+  public DrillExceptRel(RelOptCluster cluster, RelTraitSet traits,
+                       List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
+    super(cluster, traits, inputs, all);
+    if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+      throw new InvalidRelException("Input row types of the Except are not compatible.");
+    }
+  }
+
+  public static DrillExceptRel create(List<RelNode> inputs, boolean all) {
+    try {
+      return new DrillExceptRel(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs, all, true);
+    } catch (InvalidRelException e) {
+      tracer.warn(e.toString());
+      return null;
+    }
+  }
+
+  @Override
+  public DrillExceptRel copy(RelTraitSet traitSet, List<RelNode> inputs,
+                            boolean all) {
+    try {
+      return new DrillExceptRel(getCluster(), traitSet, inputs, all, false);
+    } catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  public DrillExceptRel copy(List<RelNode> inputs) {
+    try {
+      return new DrillExceptRel(getCluster(), traitSet, inputs, all, false);
+    } catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  public DrillExceptRel copy() {
+    try {
+      return new DrillExceptRel(getCluster(), traitSet, inputs, all, false);
+    } catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  public static DrillExceptRel convert(Except except, ConversionContext context) throws InvalidRelException{
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    Except.Builder builder = Except.builder();
+    for (Ord<RelNode> input : Ord.zip(inputs)) {
+      builder.addInput(implementor.visitChild(this, input.i, input.e));
+    }
+    builder.setDistinct(!all);
+    return builder.build();
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillIntersectRel.java
similarity index 52%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillIntersectRel.java
index a5a6e0330e..10dc50a1a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillIntersectRel.java
@@ -17,52 +17,61 @@
  */
 package org.apache.drill.exec.planner.logical;
 
-import java.util.List;
-
 import org.apache.calcite.linq4j.Ord;
-
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.Union;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
-import org.apache.drill.exec.planner.torel.ConversionContext;
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.slf4j.Logger;
+
+import java.util.List;
 
 /**
- * Union implemented in Drill.
+ * Intersect implemented in Drill.
  */
-public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
-  /** Creates a DrillUnionRel. */
-  public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits,
-      List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, all, checkCompatibility);
+public class DrillIntersectRel extends Intersect implements DrillRel, DrillSetOpRel {
+  private static final Logger tracer = CalciteTrace.getPlannerTracer();
+
+  public DrillIntersectRel(RelOptCluster cluster, RelTraitSet traits,
+                           List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
+    super(cluster, traits, inputs, all);
+    if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+      throw new InvalidRelException("Input row types of the Intersect are not compatible.");
+    }
+  }
+
+  public static DrillIntersectRel create(List<RelNode> inputs, boolean all) {
+    try {
+      return new DrillIntersectRel(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs, all, true);
+    } catch (InvalidRelException e) {
+      tracer.warn(e.toString());
+      return null;
+    }
   }
 
   @Override
-  public DrillUnionRel copy(RelTraitSet traitSet, List<RelNode> inputs,
-      boolean all) {
+  public DrillIntersectRel copy(RelTraitSet traitSet, List<RelNode> inputs,
+                                boolean all) {
     try {
-      return new DrillUnionRel(getCluster(), traitSet, inputs, all,
+      return new DrillIntersectRel(getCluster(), traitSet, inputs, all,
           false /* don't check compatibility during copy */);
     } catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
   }
 
-  @Override
-  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-    // divide cost by two to ensure cheaper than EnumerableDrillRel
-    return super.computeSelfCost(planner, mq).multiplyBy(.5);
+  public static DrillIntersectRel convert(org.apache.drill.common.logical.data.Intersect intersect, ConversionContext context) throws InvalidRelException{
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public LogicalOperator implement(DrillImplementor implementor) {
-    Union.Builder builder = Union.builder();
+    org.apache.drill.common.logical.data.Intersect.Builder builder = org.apache.drill.common.logical.data.Intersect.builder();
     for (Ord<RelNode> input : Ord.zip(inputs)) {
       builder.addInput(implementor.visitChild(this, input.i, input.e));
     }
@@ -70,7 +79,4 @@ public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
     return builder.build();
   }
 
-  public static DrillUnionRel convert(Union union, ConversionContext context) throws InvalidRelException{
-    throw new UnsupportedOperationException();
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
index ce3d3d7de1..f401ba76bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
@@ -29,6 +29,7 @@ import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.planner.DrillRelBuilder;
@@ -52,6 +53,9 @@ import static org.apache.drill.exec.planner.logical.DrillRel.DRILL_LOGICAL;
  */
 
 public class DrillRelFactories {
+  public static final RelFactories.SetOpFactory DRILL_LOGICAL_SET_OP_FACTORY =
+    new DrillSetOpFactoryImpl();
+
   public static final RelFactories.ProjectFactory DRILL_LOGICAL_PROJECT_FACTORY =
       new DrillProjectFactoryImpl();
 
@@ -89,6 +93,25 @@ public class DrillRelFactories {
               DEFAULT_VALUES_FACTORY,
               DEFAULT_TABLE_SCAN_FACTORY));
 
+  /**
+   * Implementation of {@link RelFactories.SetOpFactory} that returns
+   * a vanilla {@link DrillExceptRel} or {@link DrillIntersectRel}
+   * dependent on the particular kind of set operation (EXCEPT, INTERSECT)
+   */
+  private static class DrillSetOpFactoryImpl implements RelFactories.SetOpFactory {
+    @Override
+    public RelNode createSetOp(SqlKind kind, List<RelNode> inputs, boolean all) {
+      switch (kind) {
+      case EXCEPT:
+        return DrillExceptRel.create(inputs, all);
+      case INTERSECT:
+        return DrillIntersectRel.create(inputs, all);
+      default:
+        throw new AssertionError("unsupported set op: " + kind);
+      }
+    }
+  }
+
   /**
    * Implementation of {@link RelFactories.ProjectFactory} that returns a vanilla
    * {@link DrillProjectRel}.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSetOpRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSetOpRule.java
new file mode 100644
index 0000000000..6ac15c8e7b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSetOpRule.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories.SetOpFactory;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule that converts {@link LogicalIntersect} or {@link LogicalMinus} to
+ * {@link DrillIntersectRel} or {@link DrillExceptRel}.
+ */
+public class DrillSetOpRule extends RelOptRule {
+  private final SetOpFactory setOpFactory;
+  public static final List<RelOptRule> INSTANCES = Arrays.asList(
+      new DrillSetOpRule(RelOptHelper.any(LogicalIntersect.class, Convention.NONE), "DrillIntersectRelRule", DrillRelFactories.DRILL_LOGICAL_SET_OP_FACTORY),
+      new DrillSetOpRule(RelOptHelper.any(LogicalMinus.class, Convention.NONE), "DrillExceptRelRule", DrillRelFactories.DRILL_LOGICAL_SET_OP_FACTORY)
+  );
+
+  public DrillSetOpRule(RelOptRuleOperand operand, String description, SetOpFactory setOpFactory) {
+    super(operand, description);
+    this.setOpFactory = setOpFactory;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final SetOp setOp = call.rel(0);
+    final List<RelNode> convertedInputs = new ArrayList<>();
+    for (RelNode input : setOp.getInputs()) {
+      RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+      convertedInputs.add(convertedInput);
+    }
+    RelNode newRelNode = this.setOpFactory.createSetOp(setOp.kind, convertedInputs, setOp.all);
+    if (newRelNode != null) {
+      call.transformTo(newRelNode);
+    }
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java
index 69e9452d7c..a35d320bc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionAllRule.java
@@ -31,7 +31,7 @@ import org.apache.calcite.util.trace.CalciteTrace;
 import org.slf4j.Logger;
 
 /**
- * Rule that converts a {@link LogicalUnion} to a {@link DrillUnionRelBase}, implemented by a "union" operation.
+ * Rule that converts a {@link LogicalUnion} to a {@link DrillUnionRel}, implemented by a "union" operation.
  */
 public class DrillUnionAllRule extends RelOptRule {
   public static final RelOptRule INSTANCE = new DrillUnionAllRule();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
index a5a6e0330e..263266e1ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
@@ -21,10 +21,10 @@ import java.util.List;
 
 import org.apache.calcite.linq4j.Ord;
 
+import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.Union;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
 import org.apache.drill.exec.planner.torel.ConversionContext;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
@@ -36,11 +36,14 @@ import org.apache.calcite.plan.RelTraitSet;
 /**
  * Union implemented in Drill.
  */
-public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
+public class DrillUnionRel extends Union implements DrillRel, DrillSetOpRel {
   /** Creates a DrillUnionRel. */
   public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits,
       List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, all, checkCompatibility);
+    super(cluster, traits, inputs, all);
+    if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+      throw new InvalidRelException("Input row types of the Union are not compatible.");
+    }
   }
 
   @Override
@@ -62,7 +65,7 @@ public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
 
   @Override
   public LogicalOperator implement(DrillImplementor implementor) {
-    Union.Builder builder = Union.builder();
+    org.apache.drill.common.logical.data.Union.Builder builder = org.apache.drill.common.logical.data.Union.builder();
     for (Ord<RelNode> input : Ord.zip(inputs)) {
       builder.addInput(implementor.visitChild(this, input.i, input.e));
     }
@@ -70,7 +73,11 @@ public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
     return builder.build();
   }
 
-  public static DrillUnionRel convert(Union union, ConversionContext context) throws InvalidRelException{
+  public static DrillUnionRel convert(org.apache.drill.common.logical.data.Union union, ConversionContext context) throws InvalidRelException{
     throw new UnsupportedOperationException();
   }
+
+  public boolean isDistinct() {
+    return !this.all;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
index b87350d6dd..4cdff4feac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.planner.logical;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -199,6 +201,35 @@ public class PreProcessLogicalRel extends RelShuttleImpl {
     return visitChildren(union);
   }
 
+  @Override
+  public RelNode visit(LogicalIntersect intersect) {
+    for (RelNode child : intersect.getInputs()) {
+      for (RelDataTypeField dataField : child.getRowType().getFieldList()) {
+        if (dataField.getName().contains(SchemaPath.DYNAMIC_STAR)) {
+          unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
+            "Intersect(All) over schema-less tables must specify the columns explicitly\n");
+          throw new UnsupportedOperationException();
+        }
+      }
+    }
+    return visitChildren(intersect);
+  }
+
+  @Override
+  public RelNode visit(LogicalMinus minus) {
+    for (RelNode child : minus.getInputs()) {
+      for (RelDataTypeField dataField : child.getRowType().getFieldList()) {
+        if (dataField.getName().contains(SchemaPath.DYNAMIC_STAR)) {
+          unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
+            "Except(All) over schema-less tables must specify the columns explicitly\n");
+          throw new UnsupportedOperationException();
+        }
+      }
+    }
+
+    return visitChildren(minus);
+  }
+
   private UserException getConvertFunctionInvalidTypeException(final RexCall function) {
     // Caused by user entering a value with a numeric type
     final String functionName = function.getOperator().getName();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
index c3c271eab2..29f26c9305 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
@@ -23,6 +23,8 @@ import java.util.Set;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.Intersect;
 import org.apache.drill.common.logical.data.Values;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.GroupingAggregate;
@@ -187,4 +189,20 @@ public class ScanFieldDeterminer extends AbstractLogicalVisitor<Void, ScanFieldD
     }
     return null;
   }
+
+  @Override
+  public Void visitExcept(Except except, FieldList value) {
+    for (LogicalOperator o : except.getInputs()) {
+      o.accept(this, value.clone());
+    }
+    return null;
+  }
+
+  @Override
+  public Void visitIntersect(Intersect intersect, FieldList value) {
+    for (LogicalOperator o : intersect.getInputs()) {
+      o.accept(this, value.clone());
+    }
+    return null;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrel.java
similarity index 55%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrel.java
index 4bdecea4ee..70d4fc6669 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrel.java
@@ -17,60 +17,41 @@
  */
 package org.apache.drill.exec.planner.physical;
 
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.planner.cost.DrillCostBase;
-import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.Union;
-
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
-public class UnionAllPrel extends UnionPrel {
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
 
-  public UnionAllPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
-      boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, true /* all */, checkCompatibility);
+public class SetOpPrel extends SetOp implements Prel {
 
+  public SetOpPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, SqlKind kind,
+                   boolean all) throws InvalidRelException {
+    super(cluster, traits, inputs, kind, all);
   }
 
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("kind", kind);
+  }
 
-  @Override
-  public Union copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+  public SetOpPrel copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
     try {
-      return new UnionAllPrel(this.getCluster(), traitSet, inputs,
-          false /* don't check compatibility during copy */);
+      return new SetOpPrel(this.getCluster(), traitSet, inputs, kind, all);
     }catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
   }
 
-  @Override
-  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner, mq).multiplyBy(.1);
-    }
-    double totalInputRowCount = 0;
-    for (int i = 0; i < this.getInputs().size(); i++) {
-      totalInputRowCount += mq.getRowCount(this.getInputs().get(i));
-    }
-
-    double cpuCost = totalInputRowCount * DrillCostBase.BASE_CPU_COST;
-    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(totalInputRowCount, cpuCost, 0, 0);
-  }
-
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     List<PhysicalOperator> inputPops = Lists.newArrayList();
@@ -79,8 +60,8 @@ public class UnionAllPrel extends UnionPrel {
       inputPops.add( ((Prel)this.getInputs().get(i)).getPhysicalOperator(creator));
     }
 
-    UnionAll unionall = new UnionAll(inputPops);
-    return creator.addMetadata(this, unionall);
+    org.apache.drill.exec.physical.config.SetOp setOp = new org.apache.drill.exec.physical.config.SetOp(inputPops, kind, all);
+    return creator.addMetadata(this, setOp);
   }
 
   @Override
@@ -93,4 +74,18 @@ public class UnionAllPrel extends UnionPrel {
     return SelectionVectorMode.NONE;
   }
 
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(this.getInputs());
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrule.java
new file mode 100644
index 0000000000..58614bf9e2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SetOpPrule.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.drill.exec.planner.logical.DrillExceptRel;
+import org.apache.drill.exec.planner.logical.DrillIntersectRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.drill.exec.ExecConstants.EXCEPT_ADD_AGG_BELOW;
+import static org.apache.drill.exec.planner.physical.AggPruleBase.remapGroupSet;
+
+public class SetOpPrule extends Prule {
+  public static final List<RelOptRule> DIST_INSTANCES = Arrays.asList(
+    new SetOpPrule(RelOptHelper.any(DrillExceptRel.class), "Prel.HashExceptDistPrule", true),
+    new SetOpPrule(RelOptHelper.any(DrillIntersectRel.class), "Prel.HashIntersectDistPrule", true));
+  public static final List<RelOptRule> BROADCAST_INSTANCES = Arrays.asList(
+    new SetOpPrule(RelOptHelper.any(DrillExceptRel.class), "Prel.HashExceptBroadcastPrule", false),
+    new SetOpPrule(RelOptHelper.any(DrillIntersectRel.class), "Prel.HashIntersectBroadcastPrule", false));
+  protected static final Logger tracer = CalciteTrace.getPlannerTracer();
+  private final boolean isDist;
+
+  private SetOpPrule(RelOptRuleOperand operand, String description, boolean isDist) {
+    super(operand, description);
+    this.isDist = isDist;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final SetOp setOp = call.rel(0);
+    Preconditions.checkArgument(setOp.getInputs().size() == 2, "inputs of set op must be two items.");
+
+    try {
+      if(isDist){
+        createDistBothPlan(call);
+      }else{
+        if (checkBroadcastConditions(setOp.getCluster(), setOp.getInput(0), setOp.getInput(1))) {
+          createBroadcastPlan(call);
+        }
+      }
+    } catch (InvalidRelException e) {
+      tracer.warn(e.toString());
+    }
+  }
+
+  private void createDistBothPlan(RelOptRuleCall call)
+    throws InvalidRelException {
+    int i = 0;
+    int fieldCount = call.rel(0).getInput(0).getRowType().getFieldCount();
+    List<DistributionField> distFields = Lists.newArrayList();
+    while(i < fieldCount) {
+      distFields.add(new DistributionField(i));
+      i++;
+    }
+    DrillDistributionTrait distributionTrait = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+      distFields);
+    createPlan(call, distributionTrait);
+
+    if (!PrelUtil.getPlannerSettings(call.getPlanner()).isHashSingleKey()) {
+      return;
+    }
+    if (fieldCount > 1) {
+      for (int j = 0; j < fieldCount; j++) {
+        distributionTrait = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+          ImmutableList.of(new DistributionField(j)));
+        createPlan(call, distributionTrait);
+      }
+    }
+  }
+
+  private boolean checkBroadcastConditions(RelOptCluster cluster, RelNode left, RelNode right) {
+    double estimatedRightRowCount = RelMetadataQuery.instance().getRowCount(right);
+    return estimatedRightRowCount < PrelUtil.getSettings(cluster).getBroadcastThreshold()
+      && !DrillDistributionTrait.SINGLETON.equals(left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE));
+  }
+
+  private void createBroadcastPlan(final RelOptRuleCall call) throws InvalidRelException {
+    DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
+    createPlan(call, distBroadcastRight);
+  }
+
+  private void createPlan(final RelOptRuleCall call, DrillDistributionTrait setOpTrait) throws InvalidRelException {
+    if (needAddAgg(call.rel(0))) {
+      ImmutableBitSet groupSet = ImmutableBitSet.range(0, call.rel(0).getInput(0).getRowType().getFieldList().size());
+
+      // hashAgg: hash distribute on all grouping keys
+      DrillDistributionTrait distOnAllKeys =
+        new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+          ImmutableList.copyOf(getDistributionField(groupSet, true /* get all grouping keys */)));
+      RelTraitSet aggTraits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys);
+      createTransformRequest(call, aggTraits, setOpTrait, null);
+
+      // hashAgg: hash distribute on single grouping key
+      DrillDistributionTrait distOnOneKey =
+        new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+          ImmutableList.copyOf(getDistributionField(groupSet, false /* get single grouping key */)));
+      aggTraits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnOneKey);
+      createTransformRequest(call, aggTraits, setOpTrait, null);
+
+      // streamAgg: hash distribute on all grouping keys
+      final RelCollation collation = getCollation(groupSet);
+      aggTraits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnAllKeys);
+      createTransformRequest(call, aggTraits, setOpTrait, collation);
+    } else {
+      call.transformTo(buildSetOpPrel(call, null, setOpTrait));
+    }
+  }
+
+  private boolean needAddAgg(SetOp setOp) {
+    if (setOp.all || !(setOp instanceof DrillExceptRel)) {
+      return false;
+    }
+    Set<ImmutableBitSet> uniqueKeys = setOp.getCluster().getMetadataQuery().getUniqueKeys(((RelSubset)setOp.getInput(0)).getBestOrOriginal());
+    if (uniqueKeys == null) {
+      return true;
+    }
+    return uniqueKeys.size() < setOp.getRowType().getFieldCount();
+  }
+
+  private void createTransformRequest(RelOptRuleCall call, RelTraitSet aggTraits, DrillDistributionTrait setOpTrait, RelCollation collation) throws InvalidRelException {
+    boolean addAggBelow = PrelUtil.getPlannerSettings(call.getPlanner()).getOptions().getOption(EXCEPT_ADD_AGG_BELOW);
+    RelNode outputRel;
+    if (addAggBelow) {
+      AggPrelBase newAgg = buildAggPrel(call, call.rel(0).getInput(0), aggTraits, collation);
+      outputRel = buildSetOpPrel(call, newAgg, setOpTrait);
+    } else {
+      SetOpPrel setOpPrel = buildSetOpPrel(call, null, setOpTrait);
+      outputRel = buildAggPrel(call, setOpPrel, aggTraits, collation);
+    }
+    call.transformTo(outputRel);
+  }
+
+  private AggPrelBase buildAggPrel(RelOptRuleCall call, RelNode input, RelTraitSet aggTraits, RelCollation collation) throws InvalidRelException {
+    final DrillExceptRel drillExceptRel = call.rel(0);
+    ImmutableBitSet groupSet = ImmutableBitSet.range(0, drillExceptRel.getInput(0).getRowType().getFieldList().size());
+    if (collation !=  null) {
+      final RelNode convertedInput = convert(input, aggTraits);
+      return new StreamAggPrel(
+        drillExceptRel.getCluster(),
+        aggTraits,
+        convertedInput,
+        groupSet,
+        ImmutableList.of(),
+        ImmutableList.of(),
+        AggPrelBase.OperatorPhase.PHASE_1of1);
+    } else {
+      RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, aggTraits));
+      return new HashAggPrel(
+        drillExceptRel.getCluster(),
+        aggTraits,
+        convertedInput,
+        groupSet,
+        ImmutableList.of(),
+        ImmutableList.of(),
+        AggPrelBase.OperatorPhase.PHASE_1of1);
+    }
+  }
+
+  private SetOpPrel buildSetOpPrel(RelOptRuleCall call, RelNode convertedLeft, DrillDistributionTrait setOpTrait) throws InvalidRelException {
+    final SetOp setOp = call.rel(0);
+    final RelNode right = setOp.getInput(1);
+    RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(setOpTrait);
+    final RelNode convertedRight = convert(right, traitsRight);
+
+    if (convertedLeft == null) {
+      final RelNode left = setOp.getInput(0);
+      RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+      if (DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED.equals(setOpTrait.getType())) {
+        traitsLeft.plus(setOpTrait);
+      }
+      convertedLeft = convert(left, traitsLeft);
+    }
+    final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
+    return new SetOpPrel(convertedLeft.getCluster(), traitSet, ImmutableList.of(convertedLeft, convertedRight), setOp.kind, setOp.all);
+  }
+
+  private List<DistributionField> getDistributionField(ImmutableBitSet groupSet, boolean allFields) {
+    List<DistributionField> groupByFields = Lists.newArrayList();
+
+    for (int group : remapGroupSet(groupSet)) {
+      DistributionField field = new DistributionField(group);
+      groupByFields.add(field);
+
+      if (!allFields && groupByFields.size() == 1) {
+        // TODO: if we are only interested in 1 grouping field, pick the first one for now..
+        // but once we have num distinct values (NDV) statistics, we should pick the one
+        // with highest NDV.
+        break;
+      }
+    }
+
+    return groupByFields;
+  }
+
+  private RelCollation getCollation(ImmutableBitSet groupSet) {
+
+    List<RelFieldCollation> fields = Lists.newArrayList();
+    for (int group : BitSets.toIter(groupSet)) {
+      fields.add(new RelFieldCollation(group));
+    }
+    return RelCollations.of(fields);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
index 4bdecea4ee..77b594e968 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
@@ -39,9 +39,9 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class UnionAllPrel extends UnionPrel {
 
-  public UnionAllPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
-      boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, true /* all */, checkCompatibility);
+  public UnionAllPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs)
+    throws InvalidRelException {
+    super(cluster, traits, inputs, true /* all */);
 
   }
 
@@ -49,8 +49,7 @@ public class UnionAllPrel extends UnionPrel {
   @Override
   public Union copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
     try {
-      return new UnionAllPrel(this.getCluster(), traitSet, inputs,
-          false /* don't check compatibility during copy */);
+      return new UnionAllPrel(this.getCluster(), traitSet, inputs);
     }catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
index 511ac8a434..0a3cdc6649 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
@@ -101,8 +101,7 @@ public class UnionAllPrule extends Prule {
       Preconditions.checkArgument(convertedInputList.size() >= 2, "Union list must be at least two items.");
       RelNode left = convertedInputList.get(0);
       for (int i = 1; i < convertedInputList.size(); i++) {
-        left = new UnionAllPrel(union.getCluster(), traits, ImmutableList.of(left, convertedInputList.get(i)),
-            false /* compatibility already checked during logical phase */);
+        left = new UnionAllPrel(union.getCluster(), traits, ImmutableList.of(left, convertedInputList.get(i)));
 
       }
       call.transformTo(left);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
index d58c9f9f8c..992950164b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
@@ -38,17 +38,16 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class UnionDistinctPrel extends UnionPrel {
 
-  public UnionDistinctPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
-      boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, false /* all = false */, checkCompatibility);
+  public UnionDistinctPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs)
+    throws InvalidRelException {
+    super(cluster, traits, inputs, false /* all = false */);
   }
 
 
   @Override
   public Union copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
     try {
-      return new UnionDistinctPrel(this.getCluster(), traitSet, inputs,
-          false /* don't check compatibility during copy */);
+      return new UnionDistinctPrel(this.getCluster(), traitSet, inputs);
     }catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
index 5a088f5d5a..91a1021505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
@@ -61,8 +61,7 @@ public class UnionDistinctPrule extends Prule {
 
       traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
       UnionDistinctPrel unionDistinct =
-          new UnionDistinctPrel(union.getCluster(), traits, convertedInputList,
-              false /* compatibility already checked during logical phase */);
+          new UnionDistinctPrel(union.getCluster(), traits, convertedInputList);
 
       call.transformTo(unionDistinct);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
index 79d5611c80..6fbc4d37f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
@@ -17,21 +17,19 @@
  */
 package org.apache.drill.exec.planner.physical;
 
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+
+import java.util.Iterator;
+import java.util.List;
 
-public abstract class UnionPrel extends DrillUnionRelBase implements Prel{
+public abstract class UnionPrel extends Union implements Prel{
 
-  public UnionPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all,
-      boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, all, checkCompatibility);
+  public UnionPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+    super(cluster, traits, inputs, all);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
index 144b62cf3e..b0debc8ef0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.SetOpPrel;
 import org.apache.drill.exec.planner.physical.UnionPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.calcite.rel.RelNode;
@@ -86,8 +87,8 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
 
   @Override
   public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
-    if(prel instanceof UnionPrel) {
-      return addColumnOrderingBelowUnion(prel);
+    if(prel instanceof UnionPrel || prel instanceof SetOpPrel) {
+      return addColumnOrderingBelow(prel);
     }
 
     List<RelNode> children = Lists.newArrayList();
@@ -106,13 +107,13 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
     }
   }
 
-  private Prel addColumnOrderingBelowUnion(Prel prel) {
+  private Prel addColumnOrderingBelow(Prel prel) {
     List<RelNode> children = Lists.newArrayList();
     for (Prel p : prel) {
       Prel child = p.accept(this, null);
 
-      boolean needProjectBelowUnion = !(p instanceof ProjectPrel);
-      if(needProjectBelowUnion) {
+      boolean needProjectBelow = !(p instanceof ProjectPrel);
+      if(needProjectBelow) {
         child = addTrivialOrderedProjectPrel(child, false);
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index 3671bc4fd3..362993eb0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.sql.handlers;
 
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -65,7 +66,6 @@ import java.util.List;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
 import org.apache.drill.exec.util.Pointer;
 
 import java.math.BigDecimal;
@@ -184,7 +184,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
     final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() {
       @Override
       public RelNode visit(RelNode other) {
-        if (other instanceof DrillUnionRelBase) {
+        if (other instanceof DrillSetOpRel) {
           isUnsupported.value = true;
           return other;
         } else if (other instanceof DrillProjectRelBase) {
@@ -259,7 +259,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
   public RelNode visit(RelNode other) {
     if (other instanceof DrillJoinRelBase ||
         other instanceof DrillAggregateRelBase ||
-        other instanceof DrillUnionRelBase) {
+        other instanceof DrillSetOpRel) {
       return other;
     }
     if (other instanceof DrillLimitRel) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
index a8bdd1c7e3..8beefb7362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
@@ -236,14 +236,6 @@ public class UnsupportedOperatorsVisitor extends SqlShuttle {
       }
     }
 
-    // Disable unsupported Intersect, Except
-    if (sqlCall.getKind() == SqlKind.INTERSECT || sqlCall.getKind() == SqlKind.EXCEPT) {
-      unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
-          sqlCall.getOperator().getName() + " is not supported\n" +
-          "See Apache Drill JIRA: DRILL-1921");
-      throw new UnsupportedOperationException();
-    }
-
     // Disable unsupported JOINs
     if (sqlCall.getKind() == SqlKind.JOIN) {
       SqlJoin join = (SqlJoin) sqlCall;
@@ -555,4 +547,4 @@ public class UnsupportedOperatorsVisitor extends SqlShuttle {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
index 16b3ed8f9a..58e75df6e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
@@ -25,8 +25,10 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Except;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.Intersect;
 import org.apache.drill.common.logical.data.Join;
 import org.apache.drill.common.logical.data.Limit;
 import org.apache.drill.common.logical.data.LogicalOperator;
@@ -36,6 +38,8 @@ import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.common.logical.data.Union;
 import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillExceptRel;
+import org.apache.drill.exec.planner.logical.DrillIntersectRel;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.DrillLimitRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
@@ -156,6 +160,16 @@ public class ConversionContext implements ToRelContext {
       return DrillUnionRel.convert(union, context);
     }
 
+    @Override
+    public RelNode visitExcept(Except except, ConversionContext context) throws InvalidRelException{
+      return DrillExceptRel.convert(except, context);
+    }
+
+    @Override
+    public RelNode visitIntersect(Intersect intersect, ConversionContext context) throws InvalidRelException{
+      return DrillIntersectRel.convert(intersect, context);
+    }
+
     @Override
     public RelNode visitGroupingAggregate(GroupingAggregate groupBy, ConversionContext context)
         throws InvalidRelException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 64658ae871..c409053fc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -197,6 +197,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR),
       new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR),
       new OptionDefinition(ExecConstants.ENABLE_UNION_TYPE),
+      new OptionDefinition(ExecConstants.EXCEPT_ADD_AGG_BELOW),
       new OptionDefinition(ExecConstants.TEXT_ESTIMATED_ROW_SIZE),
       new OptionDefinition(ExecConstants.TEXT_WRITER_ADD_HEADER_VALIDATOR),
       new OptionDefinition(ExecConstants.TEXT_WRITER_FORCE_QUOTES_VALIDATOR),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
index 3d61ea5fba..a606d70f6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
@@ -24,9 +24,10 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.planner.common.DrillSetOpRel;
 import org.apache.drill.exec.store.plan.PluginImplementor;
 
 import java.io.IOException;
@@ -35,11 +36,14 @@ import java.util.List;
 /**
  * Union implementation for Drill plugins.
  */
-public class PluginUnionRel extends DrillUnionRelBase implements PluginRel {
+public class PluginUnionRel extends Union implements PluginRel, DrillSetOpRel {
 
   public PluginUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
       boolean all, boolean checkCompatibility) throws InvalidRelException {
-    super(cluster, traits, inputs, all, checkCompatibility);
+    super(cluster, traits, inputs, all);
+    if (checkCompatibility && !this.isCompatible(getRowType(), getInputs())) {
+      throw new InvalidRelException("Input row types of the Union are not compatible.");
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 32203ca7a7..981e51951b 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -558,6 +558,7 @@ drill.exec.options: {
     exec.bulk_load_table_list.bulk_size: 1000,
     exec.enable_bulk_load_table_list: false,
     exec.enable_union_type: false,
+    exec.except_add_agg_below: false,
     exec.errors.verbose: false,
     drill.exec.http.rest.errors.verbose: false,
     exec.hashjoin.mem_limit: 0,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
index be2f1f4f7a..b1649fc535 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
@@ -47,42 +47,6 @@ public class TestDisabledFunctionality extends BaseTestQuery {
     throw ex;
   }
 
-  @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
-  public void testDisabledIntersect() throws Exception {
-    try {
-      test("(select n_name as name from cp.`tpch/nation.parquet`) INTERSECT (select r_name as name from cp.`tpch/region.parquet`)");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
-  @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
-  public void testDisabledIntersectALL() throws Exception {
-    try {
-      test("(select n_name as name from cp.`tpch/nation.parquet`) INTERSECT ALL (select r_name as name from cp.`tpch/region.parquet`)");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
-  @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
-  public void testDisabledExceptALL() throws Exception {
-    try {
-      test("(select n_name as name from cp.`tpch/nation.parquet`) EXCEPT ALL (select r_name as name from cp.`tpch/region.parquet`)");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
-  @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
-  public void testDisabledExcept() throws Exception {
-    try {
-      test("(select n_name as name from cp.`tpch/nation.parquet`) EXCEPT (select r_name as name from cp.`tpch/region.parquet`)");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
   @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
   public void testDisabledNaturalJoin() throws Exception {
     try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java b/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java
new file mode 100644
index 0000000000..98d32605fd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java
@@ -0,0 +1,1182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.file.Paths;
+import java.util.List;
+
+@Category({SqlTest.class, OperatorTest.class})
+public class TestSetOp extends ClusterTest {
+  private static final String EMPTY_DIR_NAME = "empty_directory";
+  private static final String SLICE_TARGET_DEFAULT = "alter session reset `planner.slice_target`";
+
+  @BeforeClass
+  public static void setupTestFiles() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "parquet"));
+    dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    cluster.defineWorkspace(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        "tmp_default_format",
+        dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+        "csvh"
+    );
+  }
+
+  @Test
+  public void TestExceptionWithSchemaLessDataSource() {
+    String root = "/multilevel/csv/1994/Q1/orders_94_q1.csv";
+    try {
+      testBuilder()
+        .sqlQuery("select * from cp.`%s` intersect select * from cp.`%s`", root, root)
+        .unOrdered()
+        .baselineColumns("a", "b")
+        .baselineValues(1, 1)
+        .go();
+      Assert.fail("Missing expected exception on schema less data source");
+    } catch (Exception ex) {
+      Assert.assertThat(ex.getMessage(), ex.getMessage(),
+        CoreMatchers.containsString("schema-less tables must specify the columns explicitly"));
+    }
+
+    try {
+      testBuilder()
+        .sqlQuery("select * from cp.`%s` except select * from cp.`%s`", root, root)
+        .unOrdered()
+        .baselineColumns("a", "b")
+        .baselineValues(1, 1)
+        .go();
+      Assert.fail("Missing expected exception on schema less data source");
+    } catch (Exception ex) {
+      Assert.assertThat(ex.getMessage(), ex.getMessage(),
+        CoreMatchers.containsString("schema-less tables must specify the columns explicitly"));
+    }
+  }
+
+  @Test
+  public void testIntersect() throws Exception {
+    String query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) intersect select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("a", "b")
+      .baselineValues(2, 2)
+      .baselineValues(1, 1)
+      .build().run();
+
+    query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) intersect all select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("a", "b")
+      .baselineValues(2, 2)
+      .baselineValues(1, 1)
+      .baselineValues(1, 1)
+      .build().run();
+  }
+
+  @Test
+  public void testExcept() throws Exception {
+    String query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) except select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+    String aggAbovePattern = ".*Screen.*Agg.*SetOp.*";
+    String aggBelowPattern = ".*SetOp.*Agg.*Values.*";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(aggAbovePattern)
+      .exclude(aggBelowPattern)
+      .match(true);
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("a", "b")
+      .baselineValues(4, 4)
+      .baselineValues(3, 4)
+      .build().run();
+
+    try {
+      client.alterSession(ExecConstants.EXCEPT_ADD_AGG_BELOW_KEY, true);
+      query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) except select a, b from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+      queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(aggBelowPattern)
+        .exclude(aggAbovePattern)
+        .match(true);
+
+      testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("a", "b")
+        .baselineValues(4, 4)
+        .baselineValues(3, 4)
+        .build().run();
+    } finally {
+      client.resetSession(ExecConstants.EXCEPT_ADD_AGG_BELOW_KEY);
+    }
+
+    query = "select * from (values(4,4), (2,2), (4,4), (1,1), (3,4), (2,2), (1,1)) t(a,b) except all select * from (values(1,1), (1,1), (2,2), (3,3)) t(a,b)";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("a", "b")
+      .baselineValues(4, 4)
+      .baselineValues(4, 4)
+      .baselineValues(3, 4)
+      .baselineValues(2, 2)
+      .build().run();
+  }
+
+
+  @Test
+  public void testOverJoin() throws Exception {
+    String query =
+      "select n1.n_nationkey from cp.`tpch/nation.parquet` n1 inner join cp.`tpch/region.parquet` r1 on n1.n_regionkey = r1.r_regionkey where n1.n_nationkey in (1, 2, 3, 4) " +
+      "except " +
+      "select n2.n_nationkey from cp.`tpch/nation.parquet` n2 inner join cp.`tpch/region.parquet` r2 on n2.n_regionkey = r2.r_regionkey where n2.n_nationkey in (3, 4)";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("n_nationkey")
+      .baselineValues(1)
+      .baselineValues(2)
+      .build().run();
+  }
+
+  @Test
+  public void testExceptOverAgg() throws Exception {
+    String query = "select n1.n_regionkey from cp.`tpch/nation.parquet` n1 group by n1.n_regionkey except " +
+      "select r1.r_regionkey from cp.`tpch/region.parquet` r1 where r1.r_regionkey in (0, 1) group by r1.r_regionkey";
+
+    String excludePattern = "Screen.*Agg.*SetOp";
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .exclude(excludePattern)
+      .match(true);
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("n_regionkey")
+      .baselineValues(2)
+      .baselineValues(3)
+      .baselineValues(4)
+      .build().run();
+  }
+
+  @Test
+  public void testChain() throws Exception {
+    String query = "select n_regionkey from cp.`tpch/nation.parquet` intersect " +
+      "select r_regionkey from cp.`tpch/region.parquet` intersect " +
+      "select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey in (1,2) intersect " +
+      "select c_custkey from cp.`tpch/customer.parquet` where c_custkey < 5";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("n_regionkey")
+      .baselineValues(1)
+      .baselineValues(2)
+      .build().run();
+  }
+
+  @Test
+  public void testSameColumn() throws Exception {
+    String query = "select n_nationkey, n_regionkey from cp.`tpch/nation.parquet` where n_regionkey = 1 intersect all select r_regionkey, r_regionkey from cp.`tpch/region.parquet` where r_regionkey = 1";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("n_nationkey", "n_regionkey")
+      .baselineValues(1, 1)
+      .build().run();
+
+    query = "select n_regionkey, n_regionkey from cp.`tpch/nation.parquet` where n_regionkey = 1 except all select r_regionkey, r_regionkey from cp.`tpch/region.parquet` where r_regionkey = 1";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("n_regionkey", "n_regionkey0")
+      .baselineValues(1, 1)
+      .baselineValues(1, 1)
+      .baselineValues(1, 1)
+      .baselineValues(1, 1)
+      .build().run();
+  }
+
+  @Test
+  public void testTwoStringColumns() throws Exception {
+    String query = "select r_comment, r_regionkey from cp.`tpch/region.parquet` except select n_name, n_nationkey from cp.`tpch/nation.parquet`";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("r_comment", "r_regionkey")
+      .baselineValues("lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ", 0)
+      .baselineValues("hs use ironic, even requests. s", 1)
+      .baselineValues("ges. thinly even pinto beans ca", 2)
+      .baselineValues("ly final courts cajole furiously final excuse", 3)
+      .baselineValues("uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl", 4)
+      .build().run();
+  }
+
+
+  @Test
+  public void testConstantLiterals() throws Exception {
+    String query = "(select 'CONST' as LiteralConstant, 1 as NumberConstant, n_nationkey from cp.`tpch/nation.parquet`) " +
+      "intersect " +
+      "(select 'CONST', 1, r_regionkey from cp.`tpch/region.parquet`)";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("LiteralConstant", "NumberConstant", "n_nationkey")
+      .baselineValues("CONST", 1, 0)
+      .baselineValues("CONST", 1, 1)
+      .baselineValues("CONST", 1, 2)
+      .baselineValues("CONST", 1, 3)
+      .baselineValues("CONST", 1, 4)
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testViewExpandableStar() throws Exception {
+    try {
+      run("use dfs.tmp");
+      run("create view nation_view as select n_nationkey, n_name from (values(4,'4'), (2,'2'), (4,'4'), (1,'1'), (3,'4'), (2,'2'), (1,'1')) t(n_nationkey, n_name)");
+      run("create view region_view as select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name)");
+
+      String query1 = "(select * from dfs.tmp.`nation_view`) " +
+        "except " +
+        "(select * from dfs.tmp.`region_view`) ";
+
+      String query2 =  "(select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name)) " +
+        "intersect " +
+        "(select * from dfs.tmp.`nation_view`)";
+
+      testBuilder()
+        .sqlQuery(query1)
+        .unOrdered()
+        .baselineColumns("n_nationkey", "n_name")
+        .baselineValues(4, "4")
+        .baselineValues(3, "4")
+        .build().run();
+
+      testBuilder()
+        .sqlQuery(query2)
+        .unOrdered()
+        .baselineColumns("r_regionkey", "r_name")
+        .baselineValues(1, "1")
+        .baselineValues(2, "2")
+        .build().run();
+    } finally {
+      run("drop view if exists nation_view");
+      run("drop view if exists region_view");
+    }
+  }
+
+  @Test
+  public void testDiffDataTypesAndModes() throws Exception {
+    try {
+      run("use dfs.tmp");
+      run("create view nation_view as select n_nationkey, n_name from (values(4,'4'), (2,'2'), (4,'4'), (1,'1'), (3,'4'), (2,'2'), (1,'1')) t(n_nationkey, n_name)");
+      run("create view region_view as select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name)");
+
+
+      String t1 = "(select r_regionkey, r_name from (values(1,'1'), (1,'1'), (2,'2'), (3,'3')) t(r_regionkey, r_name))";
+      String t2 = "(select * from nation_view)";
+      String t3 = "(select * from region_view)";
+      String t4 = "(select store_id, full_name from cp.`employee.json` limit 5)";
+
+      String query1 = t1 + " intersect all " + t2 + " intersect all " + t3 + " except all " + t4;
+
+      testBuilder()
+        .sqlQuery(query1)
+        .unOrdered()
+        .baselineColumns("r_regionkey", "r_name")
+        .baselineValues(1, "1")
+        .baselineValues(1, "1")
+        .baselineValues(2, "2")
+        .build().run();
+    } finally {
+      run("drop view if exists nation_view");
+      run("drop view if exists region_view");
+    }
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testDistinctOverIntersectAllWithFullyQualifiedColumnNames() throws Exception {
+    String query = "select distinct sq.x1 " +
+      "from " +
+      "((select n_regionkey as a1 from cp.`tpch/nation.parquet`) " +
+      "intersect all " +
+      "(select r_regionkey as a2 from cp.`tpch/region.parquet`)) as sq(x1)";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("x1")
+      .baselineValues(0)
+      .baselineValues(1)
+      .baselineValues(2)
+      .baselineValues(3)
+      .baselineValues(4)
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testContainsColumnAndNumericConstant() throws Exception {
+    String query = "(select n_nationkey, n_regionkey, n_name from cp.`tpch/nation.parquet`) " +
+      "intersect " +
+      "(select 1, n_regionkey, 'ARGENTINA' from cp.`tpch/nation.parquet`)";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("n_nationkey", "n_regionkey", "n_name")
+      .baselineValues(1, 1, "ARGENTINA")
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testEmptySides() throws Exception {
+    String query1 = "(select n_nationkey, n_regionkey, n_name from cp.`tpch/nation.parquet` limit 0) " +
+      "intersect " +
+      "(select 1, n_regionkey, 'ARGENTINA' from cp.`tpch/nation.parquet`)";
+
+    String query2 = "(select n_nationkey, n_regionkey, n_name from cp.`tpch/nation.parquet` where n_nationkey = 1) " +
+      "except " +
+      "(select 1, n_regionkey, 'ARGENTINA' from cp.`tpch/nation.parquet` limit 0)";
+
+    testBuilder()
+      .sqlQuery(query1)
+      .unOrdered()
+      .baselineColumns("n_nationkey", "n_regionkey", "n_name")
+      .expectsEmptyResultSet()
+      .build().run();
+
+    testBuilder()
+      .sqlQuery(query2)
+      .unOrdered()
+      .baselineColumns("n_nationkey", "n_regionkey", "n_name")
+      .baselineValues(1, 1, "ARGENTINA")
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testAggregationOnIntersectOperator() throws Exception {
+    String root = "/store/text/data/t.json";
+
+    testBuilder()
+      .sqlQuery("(select calc1, max(b1) as `max`, min(b1) as `min`, count(c1) as `count` " +
+        "from (select a1 + 10 as calc1, b1, c1 from cp.`%s` " +
+        "intersect all select a1 + 10 as diff1, b1 as diff2, c1 as diff3 from cp.`%s`) " +
+        "group by calc1 order by calc1)", root, root)
+      .ordered()
+      .baselineColumns("calc1", "max", "min", "count")
+      .baselineValues(10L, 2L, 1L, 5L)
+      .baselineValues(20L, 5L, 3L, 5L)
+      .build().run();
+
+    testBuilder()
+      .sqlQuery("(select calc1, min(b1) as `min`, max(b1) as `max`, count(c1) as `count` " +
+        "from (select a1 + 10 as calc1, b1, c1 from cp.`%s` " +
+        "intersect all select a1 + 10 as diff1, b1 as diff2, c1 as diff3 from cp.`%s`) " +
+        "group by calc1 order by calc1)", root, root)
+      .ordered()
+      .baselineColumns("calc1", "min", "max", "count")
+      .baselineValues(10L, 1L, 2L, 5L)
+      .baselineValues(20L, 3L, 5L, 5L)
+      .build().run();
+  }
+
+  @Test(expected = UserException.class)
+  public void testImplicitCastingFailure() throws Exception {
+    String rootInt = "/store/json/intData.json";
+    String rootBoolean = "/store/json/booleanData.json";
+
+    run("(select key from cp.`%s` " +
+      "intersect all " +
+      "select key from cp.`%s` )", rootInt, rootBoolean);
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testDateAndTimestampJson() throws Exception {
+    String rootDate = "/store/json/dateData.json";
+    String rootTimpStmp = "/store/json/timeStmpData.json";
+
+    testBuilder()
+      .sqlQuery("(select max(key) as key from cp.`%s` " +
+        "except all select key from cp.`%s`)", rootDate, rootTimpStmp)
+      .unOrdered()
+      .baselineColumns("key")
+      .baselineValues("2011-07-26")
+      .build().run();
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` " +
+        "except select max(key) as key from cp.`%s`", rootTimpStmp, rootDate)
+      .unOrdered()
+      .baselineColumns("key")
+      .baselineValues("2015-03-26 19:04:55.542")
+      .baselineValues("2015-03-26 19:04:55.543")
+      .baselineValues("2015-03-26 19:04:55.544")
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testOneInputContainsAggFunction() throws Exception {
+    String root = "/multilevel/csv/1994/Q1/orders_94_q1.csv";
+
+    testBuilder()
+      .sqlQuery("select * from ((select max(c1) as ct from (select columns[0] c1 from cp.`%s`)) \n" +
+        "intersect all (select columns[0] c2 from cp.`%s`)) order by ct limit 3", root, root)
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues("99")
+      .build().run();
+
+    testBuilder()
+      .sqlQuery("select * from ((select columns[0] ct from cp.`%s`)\n" +
+        "intersect all (select max(c1) as c2 from (select columns[0] c1 from cp.`%s`))) order by ct limit 3", root, root)
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues("99")
+      .build().run();
+
+    testBuilder()
+      .sqlQuery("select * from ((select max(c1) as ct from (select columns[0] c1 from cp.`%s`))\n" +
+        "intersect all (select max(c1) as c2 from (select columns[0] c1 from cp.`%s`))) order by ct", root, root)
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues("99")
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testInputsGroupByOnCSV() throws Exception {
+    String root = "/multilevel/csv/1994/Q1/orders_94_q1.csv";
+
+    testBuilder()
+      .sqlQuery("select * from \n" +
+          "((select columns[0] as col0 from cp.`%s` t1 \n" +
+          "where t1.columns[0] = 66) \n" +
+          "intersect all \n" +
+          "(select columns[0] c2 from cp.`%s` t2 \n" +
+          "where t2.columns[0] is not null \n" +
+          "group by columns[0])) \n" +
+          "group by col0",
+        root, root)
+      .unOrdered()
+      .baselineColumns("col0")
+      .baselineValues("66")
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testDiffTypesAtPlanning() throws Exception {
+    testBuilder()
+      .sqlQuery("select count(c1) as ct from (select cast(r_regionkey as int) c1 from cp.`tpch/region.parquet`) " +
+        "intersect (select cast(r_regionkey as int) + 1 c2 from cp.`tpch/region.parquet`)")
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues((long) 5)
+      .build().run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testRightEmptyJson() throws Exception {
+    String rootEmpty = "/project/pushdown/empty.json";
+    String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` " +
+          "intersect all " +
+          "select key from cp.`%s`",
+        rootSimple,
+        rootEmpty)
+      .unOrdered()
+      .baselineColumns("key")
+      .expectsEmptyResultSet()
+      .build().run();
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` " +
+          "except all " +
+          "select key from cp.`%s`",
+        rootSimple,
+        rootEmpty)
+      .unOrdered()
+      .baselineColumns("key")
+      .baselineValues(true)
+      .baselineValues(false)
+      .build().run();
+  }
+
+  @Test
+  public void testLeftEmptyJson() throws Exception {
+    final String rootEmpty = "/project/pushdown/empty.json";
+    final String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` " +
+          "intersect all " +
+          "select key from cp.`%s`",
+        rootEmpty,
+        rootSimple)
+      .unOrdered()
+      .baselineColumns("key")
+      .expectsEmptyResultSet()
+      .build().run();
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` " +
+          "except all " +
+          "select key from cp.`%s`",
+        rootEmpty,
+        rootSimple)
+      .unOrdered()
+      .baselineColumns("key")
+      .expectsEmptyResultSet()
+      .build().run();
+  }
+
+  @Test
+  public void testBothEmptyJson() throws Exception {
+    final String rootEmpty = "/project/pushdown/empty.json";
+
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
+    final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+      .setMinorType(TypeProtos.MinorType.INT)
+      .setMode(TypeProtos.DataMode.OPTIONAL)
+      .build();
+    expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` " +
+          "intersect all " +
+          "select key from cp.`%s`",
+        rootEmpty,
+        rootEmpty)
+      .schemaBaseLine(expectedSchema)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testRightEmptyDataBatch() throws Exception {
+    String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` " +
+          "except all " +
+          "select key from cp.`%s` where 1 = 0",
+        rootSimple,
+        rootSimple)
+      .unOrdered()
+      .baselineColumns("key")
+      .baselineValues(true)
+      .baselineValues(false)
+      .build().run();
+  }
+
+  @Test
+  public void testLeftEmptyDataBatch() throws Exception {
+    String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` where 1 = 0 " +
+          "except all " +
+          "select key from cp.`%s`",
+        rootSimple,
+        rootSimple)
+      .unOrdered()
+      .baselineColumns("key")
+      .expectsEmptyResultSet()
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testBothEmptyDataBatch() throws Exception {
+    String rootSimple = "/store/json/booleanData.json";
+
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
+    final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+      .setMinorType(TypeProtos.MinorType.BIT) // field "key" is boolean type
+      .setMode(TypeProtos.DataMode.OPTIONAL)
+      .build();
+    expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+    testBuilder()
+      .sqlQuery("select key from cp.`%s` where 1 = 0 " +
+          "intersect all " +
+          "select key from cp.`%s` where 1 = 0",
+        rootSimple,
+        rootSimple)
+      .schemaBaseLine(expectedSchema)
+      .build()
+      .run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testInListOnIntersect() throws Exception {
+    String query = "select n_nationkey \n" +
+      "from (select n1.n_nationkey from cp.`tpch/nation.parquet` n1 inner join cp.`tpch/region.parquet` r1 on n1.n_regionkey = r1.r_regionkey \n" +
+      "intersect \n" +
+      "select n2.n_nationkey from cp.`tpch/nation.parquet` n2 inner join cp.`tpch/region.parquet` r2 on n2.n_regionkey = r2.r_regionkey) \n" +
+      "where n_nationkey in (1, 2)";
+
+    // Validate the plan
+    final String[] expectedPlan = {"Project.*\n" +
+      ".*SetOp\\(all=\\[false\\], kind=\\[INTERSECT\\]\\).*\n" +
+      ".*Project.*\n" +
+      ".*HashJoin.*\n" +
+      ".*SelectionVectorRemover.*\n" +
+      ".*Filter.*\n" +
+      ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" +
+      ".*Scan.*columns=\\[`r_regionkey`\\].*\n" +
+      ".*Project.*\n" +
+      ".*HashJoin.*\n" +
+      ".*SelectionVectorRemover.*\n" +
+      ".*Filter.*\n" +
+      ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" +
+      ".*Scan.*columns=\\[`r_regionkey`\\].*"};
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(expectedPlan)
+      .match();
+
+    // Validate the result
+    testBuilder()
+      .sqlQuery(query)
+      .ordered()
+      .baselineColumns("n_nationkey")
+      .baselineValues(1)
+      .baselineValues(2)
+      .build()
+      .run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testIntersectWith() throws Exception {
+    final String query1 = "WITH year_total \n" +
+      "     AS (SELECT c.r_regionkey    customer_id,\n" +
+      "                1 year_total\n" +
+      "         FROM   cp.`tpch/region.parquet` c\n" +
+      "         Intersect ALL \n" +
+      "         SELECT c.r_regionkey    customer_id, \n" +
+      "                1 year_total\n" +
+      "         FROM   cp.`tpch/region.parquet` c) \n" +
+      "SELECT count(t_s_secyear.customer_id) as ct \n" +
+      "FROM   year_total t_s_firstyear, \n" +
+      "       year_total t_s_secyear, \n" +
+      "       year_total t_w_firstyear, \n" +
+      "       year_total t_w_secyear \n" +
+      "WHERE  t_s_secyear.customer_id = t_s_firstyear.customer_id \n" +
+      "       AND t_s_firstyear.customer_id = t_w_secyear.customer_id \n" +
+      "       AND t_s_firstyear.customer_id = t_w_firstyear.customer_id \n" +
+      "       AND CASE \n" +
+      "             WHEN t_w_firstyear.year_total > 0 THEN t_w_secyear.year_total \n" +
+      "             ELSE NULL \n" +
+      "           END > -1";
+
+    final String query2 = "WITH year_total \n" +
+      "     AS (SELECT c.r_regionkey    customer_id,\n" +
+      "                1 year_total\n" +
+      "         FROM   cp.`tpch/region.parquet` c\n" +
+      "         Intersect ALL \n" +
+      "         SELECT c.r_regionkey    customer_id, \n" +
+      "                1 year_total\n" +
+      "         FROM   cp.`tpch/region.parquet` c) \n" +
+      "SELECT count(t_w_firstyear.customer_id) as ct \n" +
+      "FROM   year_total t_w_firstyear, \n" +
+      "       year_total t_w_secyear \n" +
+      "WHERE  t_w_firstyear.year_total = t_w_secyear.year_total \n" +
+      " AND t_w_firstyear.year_total > 0 and t_w_secyear.year_total > 0";
+
+    final String query3 = "WITH year_total_1\n" +
+      "             AS (SELECT c.r_regionkey    customer_id,\n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/region.parquet` c\n" +
+      "                 Intersect ALL \n" +
+      "                 SELECT c.r_regionkey    customer_id, \n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/region.parquet` c) \n" +
+      "             , year_total_2\n" +
+      "             AS (SELECT c.n_nationkey    customer_id,\n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/nation.parquet` c\n" +
+      "                 Intersect ALL \n" +
+      "                 SELECT c.n_nationkey    customer_id, \n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/nation.parquet` c) \n" +
+      "        SELECT count(t_w_firstyear.customer_id) as ct\n" +
+      "        FROM   year_total_1 t_w_firstyear,\n" +
+      "               year_total_2 t_w_secyear\n" +
+      "        WHERE  t_w_firstyear.year_total = t_w_secyear.year_total\n" +
+      "           AND t_w_firstyear.year_total > 0 and t_w_secyear.year_total > 0";
+
+    final String query4 = "WITH year_total_1\n" +
+      "             AS (SELECT c.n_regionkey    customer_id,\n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/nation.parquet` c\n" +
+      "                 Intersect ALL \n" +
+      "                 SELECT c.r_regionkey    customer_id, \n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/region.parquet` c), \n" +
+      "             year_total_2\n" +
+      "             AS (SELECT c.n_regionkey    customer_id,\n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/nation.parquet` c\n" +
+      "                 Intersect ALL \n" +
+      "                 SELECT c.r_regionkey    customer_id, \n" +
+      "                        1 year_total\n" +
+      "                 FROM   cp.`tpch/region.parquet` c) \n" +
+      "        SELECT count(t_w_firstyear.customer_id) as ct \n" +
+      "        FROM   year_total_1 t_w_firstyear,\n" +
+      "               year_total_2 t_w_secyear\n" +
+      "        WHERE  t_w_firstyear.year_total = t_w_secyear.year_total\n" +
+      "         AND t_w_firstyear.year_total > 0 and t_w_secyear.year_total > 0";
+
+    testBuilder()
+      .sqlQuery(query1)
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues((long) 5)
+      .build()
+      .run();
+
+    testBuilder()
+      .sqlQuery(query2)
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues((long) 25)
+      .build()
+      .run();
+
+    testBuilder()
+      .sqlQuery(query3)
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues((long) 125)
+      .build()
+      .run();
+
+    testBuilder()
+      .sqlQuery(query4)
+      .ordered()
+      .baselineColumns("ct")
+      .baselineValues((long) 25)
+      .build()
+      .run();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testFragmentNum() throws Exception {
+    final String l = "/multilevel/parquet/1994";
+    final String r = "/multilevel/parquet/1995";
+
+    final String query = String.format("SELECT o_custkey FROM dfs.`%s` \n" +
+      "Except All SELECT o_custkey FROM dfs.`%s`", l, r);
+
+    // Validate the plan
+    final String[] expectedPlan = {"UnionExchange.*\n",
+      ".*SetOp"};
+
+    try {
+      client.alterSession(ExecConstants.SLICE_TARGET, 1);
+      queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlan)
+        .match();
+
+      testBuilder()
+        .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    } finally {
+      client.resetSession(ExecConstants.SLICE_TARGET);
+    }
+  }
+
+  @Test
+  public void testGroupByOnSetOp() throws Exception {
+    final String l = "/multilevel/parquet/1994";
+    final String r = "/multilevel/parquet/1995";
+
+    final String query = String.format("Select o_custkey, count(*) as cnt from \n" +
+      " (SELECT o_custkey FROM dfs.`%s` \n" +
+      "Intersect All SELECT o_custkey FROM dfs.`%s`) \n" +
+      "group by o_custkey", l, r);
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)UnionExchange.*StreamAgg.*Sort.*SetOp.*"};
+
+    try {
+      client.alterSession(ExecConstants.SLICE_TARGET, 1);
+      queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlan)
+        .match();
+
+      testBuilder()
+        .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    } finally {
+      client.resetSession(ExecConstants.SLICE_TARGET);
+    }
+  }
+
+  @Test
+  public void testSetOpOnHashJoin() throws Exception {
+    final String l = "/multilevel/parquet/1994";
+    final String r = "/multilevel/parquet/1995";
+
+    final String query = String.format("SELECT o_custkey FROM \n" +
+      " (select o1.o_custkey from dfs.`%s` o1 inner join dfs.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
+      " Intersect All SELECT o_custkey FROM dfs.`%s` where o_custkey > 10", l, r, l);
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)UnionExchange.*SetOp.*HashJoin.*"};
+
+    try {
+      client.alterSession(ExecConstants.SLICE_TARGET, 1);
+      queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlan)
+        .match();
+
+      testBuilder()
+        .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    } finally {
+      client.resetSession(ExecConstants.SLICE_TARGET);
+    }
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testLimitOneOnRightSide() throws Exception {
+    final String l = "/multilevel/parquet/1994";
+    final String r = "/multilevel/parquet/1995";
+
+    final String query = String.format("SELECT o_custkey FROM \n" +
+      " ((select o1.o_custkey from dfs.`%s` o1 inner join dfs.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
+      " Intersect All (SELECT o_custkey FROM dfs.`%s` limit 1))", l, r, l);
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)UnionExchange.*SetOp.*HashJoin.*"};
+
+    try {
+      client.alterSession(ExecConstants.SLICE_TARGET, 1);
+      client.alterSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY, true);
+      queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlan)
+        .match();
+
+      testBuilder()
+        .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    } finally {
+      client.resetSession(ExecConstants.SLICE_TARGET);
+      client.resetSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY);
+    }
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testLimitOneOnLeftSide() throws Exception {
+    final String l = "/multilevel/parquet/1994";
+    final String r = "/multilevel/parquet/1995";
+
+    final String query = String.format("SELECT o_custkey FROM \n" +
+      " ((SELECT o_custkey FROM dfs.`%s` limit 1) \n" +
+      " intersect all \n" +
+      " (select o1.o_custkey from dfs.`%s` o1 inner join dfs.`%s` o2 on o1.o_orderkey = o2.o_custkey))", l, r, l);
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)SetOp.*BroadcastExchange.*HashJoin.*"};
+
+    try {
+      client.alterSession(ExecConstants.SLICE_TARGET, 1);
+      client.alterSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY, true);
+      queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlan)
+        .match();
+
+      testBuilder()
+        .optionSettingQueriesForBaseline(SLICE_TARGET_DEFAULT)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    } finally {
+      client.resetSession(ExecConstants.SLICE_TARGET);
+      client.resetSession(PlannerSettings.UNIONALL_DISTRIBUTE_KEY);
+    }
+  }
+
+  @Test
+  public void testIntersectAllWithValues() throws Exception {
+    testBuilder()
+      .sqlQuery("values('A') intersect all values('A')")
+      .unOrdered()
+      .baselineColumns("EXPR$0")
+      .baselineValues("A")
+      .go();
+  }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  public void testFieldWithDots() throws Exception {
+    String fileName = "table.json";
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(dirTestWatcher.getRootDir(), fileName)))) {
+      writer.write("{\"rk.q\": \"a\", \"m\": {\"a.b\":\"1\", \"a\":{\"b\":\"2\"}, \"c\":\"3\"}}");
+    }
+
+    testBuilder()
+      .sqlQuery("select * from (" +
+        "(select t.m.`a.b` as a,\n" +
+        "t.m.a.b as b,\n" +
+        "t.m['a.b'] as c,\n" +
+        "t.`rk.q` as e\n" +
+        "from dfs.`%1$s` t)\n" +
+        "intersect all\n" +
+        "(select t.m.`a.b` as a,\n" +
+        "t.m.a.b as b,\n" +
+        "t.m['a.b'] as c,\n" +
+        "t.`rk.q` as e\n" +
+        "from dfs.`%1$s` t))", fileName)
+      .unOrdered()
+      .baselineColumns("a", "b", "c", "e")
+      .baselineValues("1", "2", "1", "a")
+      .go();
+  }
+
+  @Test
+  public void testExceptAllRightEmptyDir() throws Exception {
+    String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+      .sqlQuery("SELECT key FROM cp.`%s` EXCEPT ALL SELECT key FROM dfs.tmp_default_format.`%s`",
+        rootSimple, EMPTY_DIR_NAME)
+      .unOrdered()
+      .baselineColumns("key")
+      .baselineValues(true)
+      .baselineValues(false)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testExceptAllLeftEmptyDir() throws Exception {
+    final String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+      .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%s` EXCEPT ALL SELECT key FROM cp.`%s`",
+        EMPTY_DIR_NAME, rootSimple)
+      .unOrdered()
+      .baselineColumns("key")
+      .expectsEmptyResultSet()
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testIntersectBothEmptyDirs() throws Exception {
+    SchemaBuilder schemaBuilder = new SchemaBuilder()
+      .addNullable("key", TypeProtos.MinorType.INT);
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+      .withSchemaBuilder(schemaBuilder)
+      .build();
+
+    testBuilder()
+      .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%1$s` INTERSECT ALL SELECT key FROM dfs.tmp_default_format.`%1$s`", EMPTY_DIR_NAME)
+      .schemaBaseLine(expectedSchema)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testSetOpMiddleEmptyDir() throws Exception {
+    final String query = "(SELECT n_regionkey FROM cp.`tpch/nation.parquet` EXCEPT ALL " +
+      "SELECT missing_key FROM dfs.tmp_default_format.`%s`) intersect all SELECT r_regionkey FROM cp.`tpch/region.parquet`";
+
+    testBuilder()
+      .sqlQuery(query, EMPTY_DIR_NAME)
+      .unOrdered()
+      .baselineColumns("n_regionkey")
+      .baselineValues(0)
+      .baselineValues(1)
+      .baselineValues(2)
+      .baselineValues(3)
+      .baselineValues(4)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testComplexQueryWithSetOpAndEmptyDir() throws Exception {
+    final String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+      .sqlQuery("SELECT key FROM cp.`%2$s` INTERSECT ALL SELECT key FROM " +
+          "(SELECT key FROM cp.`%2$s` EXCEPT ALL SELECT key FROM dfs.tmp_default_format.`%1$s`)",
+        EMPTY_DIR_NAME, rootSimple)
+      .unOrdered()
+      .baselineColumns("key")
+      .baselineValues(true)
+      .baselineValues(false)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testIntersectCancellation() throws Exception {
+    String query = "WITH foo AS\n" +
+      "  (SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+      "   Intersect ALL\n" +
+      "   SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+      "   WHERE n_nationkey > (SELECT 1) )\n" +
+      "SELECT * FROM foo\n" +
+      "LIMIT 1";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("a")
+      .baselineValues(1)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testMultiBatch() throws Exception {
+    String query = "(select * from (values(1,1)) t(a,b) union all select * from (values(3,3)) t(a,b) union all select * from (values(5,5)) t(a,b)) intersect all " +
+      "(select * from (values(1,1)) t(a,b) union all select * from (values(3,3), (2,2)) t(a,b) union all select * from (values(6,6), (4,4), (5,5)) t(a,b)) ";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("a", "b")
+      .baselineValues(5, 5)
+      .baselineValues(3, 3)
+      .baselineValues(1, 1)
+      .build().run();
+  }
+
+  @Test
+  public void testFirstEmptyBatch() throws Exception {
+    String query = "(select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 0 union all select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 5) intersect all " +
+      "(select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 0 union all select n_nationkey from cp.`tpch/nation.parquet` where n_nationkey < 3)";
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("n_nationkey")
+      .baselineValues(0)
+      .baselineValues(2)
+      .baselineValues(1)
+      .build().run();
+  }
+
+  @Test
+  public void testUnsupportedComplexType() {
+    try {
+      String query = "select sia from cp.`complex/json/complex.json` intersect all select sia from cp.`complex/json/complex.json`";
+      testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("sia")
+        .baselineValues("[1,11,101,1001]")
+        .baselineValues("[2,12,102,1002]")
+        .baselineValues("[3,13,103,1003]")
+        .build().run();
+      Assert.fail("Missing expected exception on complex type");
+    } catch (Exception ex) {
+      Assert.assertThat(ex.getMessage(), ex.getMessage(),
+        CoreMatchers.containsString("Map, Array, Union or repeated scalar type should not be used in group by, order by or in a comparison operator"));
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index ec0471e255..996c5c4008 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -829,12 +829,17 @@ public class QueryBuilder {
      *  only if expected pattern was not matched or unexpected pattern was matched.
      */
     public void match() {
-      included.forEach(pattern -> match(pattern, true));
-      excluded.forEach(pattern -> match(pattern, false));
+      included.forEach(pattern -> match(pattern, true, false));
+      excluded.forEach(pattern -> match(pattern, false, false));
     }
 
-    private void match(String patternString, boolean expectedResult) {
-      Pattern pattern = Pattern.compile(patternString);
+    public void match(boolean matchMultiLine) {
+      included.forEach(pattern -> match(pattern, true, matchMultiLine));
+      excluded.forEach(pattern -> match(pattern, false, matchMultiLine));
+    }
+
+    private void match(String patternString, boolean expectedResult, boolean matchMultiLine) {
+      Pattern pattern = Pattern.compile(patternString, (matchMultiLine ? Pattern.DOTALL : 0));
       Matcher matcher = pattern.matcher(plan);
       String message = String.format("%s in plan: %s\n%s",
         expectedResult ? EXPECTED_NOT_FOUND : UNEXPECTED_FOUND, patternString, plan);
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/Except.java b/logical/src/main/java/org/apache/drill/common/logical/data/Except.java
new file mode 100644
index 0000000000..b060a16054
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Except.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("Except")
+public class Except extends LogicalOperatorBase {
+  private final List<LogicalOperator> inputs;
+  private final boolean distinct;
+
+  @JsonCreator
+  public Except(@JsonProperty("inputs") List<LogicalOperator> inputs, @JsonProperty("distinct") Boolean distinct){
+    this.inputs = inputs;
+      for (LogicalOperator o : inputs) {
+          o.registerAsSubscriber(this);
+      }
+    this.distinct = distinct == null ? false : distinct;
+  }
+
+  public List<LogicalOperator> getInputs() {
+    return inputs;
+  }
+
+  public boolean isDistinct() {
+    return distinct;
+  }
+
+    @Override
+    public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
+        return logicalVisitor.visitExcept(this, value);
+    }
+
+    @Override
+    public Iterator<LogicalOperator> iterator() {
+        return inputs.iterator();
+    }
+
+
+    public static Builder builder(){
+      return new Builder();
+    }
+
+    public static class Builder extends AbstractBuilder<Except>{
+      private List<LogicalOperator> inputs = Lists.newArrayList();
+      private boolean distinct;
+
+      public Builder addInput(LogicalOperator o){
+        inputs.add(o);
+        return this;
+      }
+
+      public Builder setDistinct(boolean distinct){
+        this.distinct = distinct;
+        return this;
+      }
+
+      @Override
+      public Except build() {
+        return new Except(inputs, distinct);
+      }
+
+    }
+
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/Intersect.java b/logical/src/main/java/org/apache/drill/common/logical/data/Intersect.java
new file mode 100644
index 0000000000..ec0a1df70e
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Intersect.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("intersect")
+public class Intersect extends LogicalOperatorBase {
+  private final List<LogicalOperator> inputs;
+  private final boolean distinct;
+
+  @JsonCreator
+  public Intersect(@JsonProperty("inputs") List<LogicalOperator> inputs, @JsonProperty("distinct") Boolean distinct){
+    this.inputs = inputs;
+      for (LogicalOperator o : inputs) {
+          o.registerAsSubscriber(this);
+      }
+    this.distinct = distinct == null ? false : distinct;
+  }
+
+  public List<LogicalOperator> getInputs() {
+    return inputs;
+  }
+
+  public boolean isDistinct() {
+    return distinct;
+  }
+
+    @Override
+    public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
+        return logicalVisitor.visitIntersect(this, value);
+    }
+
+    @Override
+    public Iterator<LogicalOperator> iterator() {
+        return inputs.iterator();
+    }
+
+
+    public static Builder builder(){
+      return new Builder();
+    }
+
+    public static class Builder extends AbstractBuilder<Intersect>{
+      private List<LogicalOperator> inputs = Lists.newArrayList();
+      private boolean distinct;
+
+      public Builder addInput(LogicalOperator o){
+        inputs.add(o);
+        return this;
+      }
+
+      public Builder setDistinct(boolean distinct){
+        this.distinct = distinct;
+        return this;
+      }
+
+      @Override
+      public Intersect build() {
+        return new Intersect(inputs, distinct);
+      }
+
+    }
+
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
index c46dc43335..2f63b3f2f4 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.common.logical.data.visitors;
 
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.Intersect;
 import org.apache.drill.common.logical.data.LateralJoin;
 import org.apache.drill.common.logical.data.Unnest;
 import org.apache.drill.common.logical.data.Analyze;
@@ -111,6 +113,16 @@ public abstract class AbstractLogicalVisitor<T, X, E extends Throwable> implemen
         return visitOp(union, value);
     }
 
+    @Override
+    public T visitExcept(Except except, X value) throws E {
+      return visitOp(except, value);
+    }
+
+    @Override
+    public T visitIntersect(Intersect intersect, X value) throws E {
+      return visitOp(intersect, value);
+    }
+
     @Override
     public T visitWindow(Window window, X value) throws E {
         return visitOp(window, value);
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
index ee9036c6e7..fee1595688 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
@@ -18,6 +18,8 @@
 package org.apache.drill.common.logical.data.visitors;
 
 
+import org.apache.drill.common.logical.data.Except;
+import org.apache.drill.common.logical.data.Intersect;
 import org.apache.drill.common.logical.data.LateralJoin;
 import org.apache.drill.common.logical.data.Unnest;
 import org.apache.drill.common.logical.data.Analyze;
@@ -62,6 +64,8 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
     public RETURN visitRunningAggregate(RunningAggregate runningAggregate, EXTRA value) throws EXCEP;
     public RETURN visitTransform(Transform transform, EXTRA value) throws EXCEP;
     public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
+    public RETURN visitExcept(Except except, EXTRA value) throws EXCEP;
+    public RETURN visitIntersect(Intersect intersect, EXTRA value) throws EXCEP;
     public RETURN visitWindow(Window window, EXTRA value) throws EXCEP;
     public RETURN visitWriter(Writer writer, EXTRA value) throws EXCEP;