You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/11/01 23:13:53 UTC

[GitHub] Ben-Zvi closed pull request #1514: DRILL-6798: Planner changes to support semi-join.

Ben-Zvi closed pull request #1514: DRILL-6798: Planner changes to support semi-join.
URL: https://github.com/apache/drill/pull/1514
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index ae55c9f21b5..17f8da52321 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -358,15 +358,18 @@ static RuleSet getDrillBasicRules(OptimizerRulesContext optimizerRulesContext) {
      * We have to create another copy of the ruleset with the context dependent elements;
      * this cannot be reused across queries.
      */
-    final ImmutableSet<RelOptRule> basicRules = ImmutableSet.<RelOptRule>builder()
+    ImmutableSet.Builder<RelOptRule> basicRules = ImmutableSet.<RelOptRule>builder()
         .addAll(staticRuleSet)
         .add(
             DrillMergeProjectRule.getInstance(true, RelFactories.DEFAULT_PROJECT_FACTORY,
                 optimizerRulesContext.getFunctionRegistry())
-            )
-        .build();
+            );
+    if (optimizerRulesContext.getPlannerSettings().isHashJoinEnabled() &&
+        optimizerRulesContext.getPlannerSettings().isSemiJoinEnabled()) {
+      basicRules.add(RuleInstance.SEMI_JOIN_PROJECT_RULE);
+    }
 
-    return RuleSets.ofList(basicRules);
+    return RuleSets.ofList(basicRules.build());
   }
 
   /**
@@ -474,7 +477,6 @@ static RuleSet getJoinPermRules(OptimizerRulesContext optimizerRulesContext) {
   static RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContext) {
     final List<RelOptRule> ruleList = new ArrayList<>();
     final PlannerSettings ps = optimizerRulesContext.getPlannerSettings();
-
     ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
     ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
     ruleList.add(SortConvertPrule.INSTANCE);
@@ -509,9 +511,14 @@ static RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContext) {
 
     if (ps.isHashJoinEnabled()) {
       ruleList.add(HashJoinPrule.DIST_INSTANCE);
-
+      if (ps.isSemiJoinEnabled()) {
+        ruleList.add(HashJoinPrule.SEMI_DIST_INSTANCE);
+      }
       if(ps.isBroadcastJoinEnabled()){
         ruleList.add(HashJoinPrule.BROADCAST_INSTANCE);
+        if (ps.isSemiJoinEnabled()) {
+          ruleList.add(HashJoinPrule.SEMI_BROADCAST_INSTANCE);
+        }
       }
     }
 
@@ -521,7 +528,6 @@ static RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContext) {
       if(ps.isBroadcastJoinEnabled()){
         ruleList.add(MergeJoinPrule.BROADCAST_INSTANCE);
       }
-
     }
 
     // NLJ plans consist of broadcasting the right child, hence we need
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 8aec96c947f..b14488c9ca5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -18,8 +18,11 @@
 package org.apache.drill.exec.planner;
 
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.volcano.AbstractConverter;
+import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -39,12 +42,13 @@
 import org.apache.calcite.rel.rules.ProjectToWindowRule;
 import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
 import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SemiJoinRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.rel.rules.SubQueryRemoveRule;
 import org.apache.calcite.rel.rules.UnionToDistinctRule;
 import org.apache.drill.exec.planner.logical.DrillConditions;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 /**
  * Contains rule instances which use custom RelBuilder.
  */
@@ -58,6 +62,15 @@
       new UnionToDistinctRule(LogicalUnion.class,
           DrillRelFactories.LOGICAL_BUILDER);
 
+  SemiJoinRule SEMI_JOIN_PROJECT_RULE = new SemiJoinRule.ProjectToSemiJoinRule(Project.class, Join.class, Aggregate.class,
+          DrillRelFactories.LOGICAL_BUILDER, "DrillSemiJoinRule:project") {
+    public boolean matches(RelOptRuleCall call) {
+      Preconditions.checkArgument(call.rel(1) instanceof Join);
+      Join join = call.rel(1);
+      return !(join.getCondition().isAlwaysTrue() || join.getCondition().isAlwaysFalse());
+    }
+  };
+
   JoinPushExpressionsRule JOIN_PUSH_EXPRESSIONS_RULE =
       new JoinPushExpressionsRule(Join.class,
           DrillRelFactories.LOGICAL_BUILDER);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
index 434016ff830..cde49e4b96e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
@@ -29,6 +29,7 @@
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -45,7 +46,7 @@
 /**
  * Base class for logical and physical Joins implemented in Drill.
  */
-public abstract class DrillJoinRelBase extends Join implements DrillRelNode {
+public abstract class DrillJoinRelBase extends Join implements DrillJoin {
   protected List<Integer> leftKeys = Lists.newArrayList();
   protected List<Integer> rightKeys = Lists.newArrayList();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
new file mode 100644
index 00000000000..30067dab9c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillRelNode;
+import java.util.List;
+
+/**
+ * Interface which needs to be implemented by all the join relation expressions.
+ */
+public interface DrillJoin extends DrillRelNode {
+
+  /* Columns of left table that are part of join condition */
+  List<Integer> getLeftKeys();
+
+  /* Columns of right table that are part of join condition */
+  List<Integer> getRightKeys();
+
+  /* JoinType of the join operation*/
+  JoinRelType getJoinType();
+
+  /* Join condition of the join relation */
+  RexNode getCondition();
+
+  /* Left RelNode of the Join Relation */
+  RelNode getLeft();
+
+  /* Right RelNode of the Join Relation */
+  RelNode getRight();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index 42f7e72bc51..0126e745cd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -104,7 +104,7 @@ public LogicalOperator implement(DrillImplementor implementor) {
    * @return
    */
   private LogicalOperator implementInput(DrillImplementor implementor, int i, int offset, RelNode input) {
-    return implementInput(implementor, i, offset, input, this);
+    return implementInput(implementor, i, offset, input, this, this.getRowType().getFieldNames());
   }
 
   /**
@@ -118,12 +118,12 @@ private LogicalOperator implementInput(DrillImplementor implementor, int i, int
    * @return
    */
   public static LogicalOperator implementInput(DrillImplementor implementor, int i, int offset,
-                                                RelNode input, DrillRel currentNode) {
+                                               RelNode input, DrillRel currentNode,
+                                               List<String> parentFields) {
     final LogicalOperator inputOp = implementor.visitChild(currentNode, i, input);
     assert uniqueFieldNames(input.getRowType());
-    final List<String> fields = currentNode.getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+    final List<String> outputFields = parentFields.subList(offset, offset + inputFields.size());
     if (!outputFields.equals(inputFields)) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
index 4356d491046..ca03de14fd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
@@ -28,6 +28,7 @@
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
 
+import java.util.ArrayList;
 import java.util.List;
 
 
@@ -48,12 +49,14 @@ public Correlate copy(RelTraitSet traitSet,
 
   @Override
   public LogicalOperator implement(DrillImplementor implementor) {
-    final List<String> fields = getRowType().getFieldNames();
+    List<String> fields = new ArrayList<>();
+    fields.addAll(getInput(0).getRowType().getFieldNames());
+    fields.addAll(getInput(1).getRowType().getFieldNames());
     assert DrillJoinRel.isUnique(fields);
     final int leftCount = getInputSize(0);
 
-    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this);
-    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this);
+    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this, fields);
+    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this, fields);
 
     return new LateralJoin(leftOp, rightOp);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
index d5ff56bc1b5..a0b727d3f29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
@@ -22,6 +22,7 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.type.RelDataType;
@@ -39,7 +40,6 @@
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_JOIN_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_MATCH_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_PROJECT_FACTORY;
-import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SEMI_JOIN_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SET_OP_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SORT_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_TABLE_SCAN_FACTORY;
@@ -60,6 +60,17 @@
   public static final RelFactories.JoinFactory DRILL_LOGICAL_JOIN_FACTORY = new DrillJoinFactoryImpl();
 
   public static final RelFactories.AggregateFactory DRILL_LOGICAL_AGGREGATE_FACTORY = new DrillAggregateFactoryImpl();
+
+  public static final RelFactories.SemiJoinFactory DRILL_SEMI_JOIN_FACTORY = new SemiJoinFactoryImpl();
+
+  private static class SemiJoinFactoryImpl implements RelFactories.SemiJoinFactory {
+    public RelNode createSemiJoin(RelNode left, RelNode right,
+                                  RexNode condition) {
+      final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+      return DrillSemiJoinRel.create(left, right,
+              condition, joinInfo.leftKeys, joinInfo.rightKeys);
+    }
+  }
   /**
    * A {@link RelBuilderFactory} that creates a {@link DrillRelBuilder} that will
    * create logical relational expressions for everything.
@@ -69,7 +80,7 @@
           Contexts.of(DEFAULT_PROJECT_FACTORY,
               DEFAULT_FILTER_FACTORY,
               DEFAULT_JOIN_FACTORY,
-              DEFAULT_SEMI_JOIN_FACTORY,
+              DRILL_SEMI_JOIN_FACTORY,
               DEFAULT_SORT_FACTORY,
               DEFAULT_AGGREGATE_FACTORY,
               DEFAULT_MATCH_FACTORY,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
new file mode 100644
index 00000000000..09e4be9de25
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalSemiJoin;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel {
+
+  public DrillSemiJoinRel(
+          RelOptCluster cluster,
+          RelTraitSet traitSet,
+          RelNode left,
+          RelNode right,
+          RexNode condition,
+          ImmutableIntList leftKeys,
+          ImmutableIntList rightKeys) {
+    super(cluster,
+          traitSet,
+          left,
+          right,
+          condition,
+          leftKeys,
+          rightKeys);
+  }
+
+  public static SemiJoin create(RelNode left, RelNode right, RexNode condition,
+                                ImmutableIntList leftKeys, ImmutableIntList rightKeys) {
+    final RelOptCluster cluster = left.getCluster();
+    return new DrillSemiJoinRel(cluster, cluster.traitSetOf(DrillRel.DRILL_LOGICAL), left,
+            right, condition, leftKeys, rightKeys);
+  }
+
+  @Override
+  public SemiJoin copy(RelTraitSet traitSet, RexNode condition,
+                                 RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+    Preconditions.checkArgument(joinType == JoinRelType.INNER);
+    final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+    Preconditions.checkArgument(joinInfo.isEqui());
+    return new DrillSemiJoinRel(getCluster(), traitSet, left, right, condition,
+            joinInfo.leftKeys, joinInfo.rightKeys);
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    List<String> fields = new ArrayList<>();
+    fields.addAll(getInput(0).getRowType().getFieldNames());
+    fields.addAll(getInput(1).getRowType().getFieldNames());
+    Preconditions.checkArgument(DrillJoinRel.isUnique(fields));
+    final int leftCount = left.getRowType().getFieldCount();
+    final List<String> leftFields = fields.subList(0, leftCount);
+    final List<String> rightFields = fields.subList(leftCount, leftCount + right.getRowType().getFieldCount());
+
+    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this, fields);
+    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this, fields);
+
+    Join.Builder builder = Join.builder();
+    builder.type(joinType);
+    builder.left(leftOp);
+    builder.right(rightOp);
+    List<JoinCondition> conditions = Lists.newArrayList();
+    for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+      conditions.add(new JoinCondition(DrillJoinRel.EQUALITY_CONDITION,
+              new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right))));
+    }
+
+    return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 0e1fc4e85ce..6480f3d3581 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -20,8 +20,13 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.common.logical.data.JoinCondition;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -38,7 +43,7 @@
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 
@@ -50,14 +55,25 @@
   private int joinControl;
 
   public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
-                      JoinRelType joinType) throws InvalidRelException {
-    this(cluster, traits, left, right, condition, joinType, false, null, false, JoinControl.DEFAULT);
+                      JoinRelType joinType, boolean semiJoin) throws InvalidRelException {
+    this(cluster, traits, left, right, condition, joinType, false, null, false, JoinControl.DEFAULT, semiJoin);
+  }
+
+  public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+                      JoinRelType joinType, boolean swapped, RuntimeFilterDef runtimeFilterDef,
+                      boolean isRowKeyJoin, int joinControl) throws InvalidRelException {
+    this(cluster, traits, left, right, condition, joinType, swapped, runtimeFilterDef, isRowKeyJoin, joinControl, false);
   }
 
   public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
       JoinRelType joinType, boolean swapped, RuntimeFilterDef runtimeFilterDef,
-      boolean isRowKeyJoin, int joinControl) throws InvalidRelException {
-    super(cluster, traits, left, right, condition, joinType);
+      boolean isRowKeyJoin, int joinControl, boolean semiJoin) throws InvalidRelException {
+    super(cluster, traits, left, right, condition, joinType, semiJoin);
+    Preconditions.checkArgument(isSemiJoin && !swapped || swapped && !isSemiJoin || (!swapped && !isSemiJoin));
+    if (isSemiJoin) {
+      Preconditions.checkArgument(!swapped, "swapping of inputs is not allowed for semi-joins");
+      Preconditions.checkArgument(validateTraits(traitSet, left, right));
+    }
     this.swapped = swapped;
     this.isRowKeyJoin = isRowKeyJoin;
     joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys, filterNulls);
@@ -65,11 +81,34 @@ public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, Rel
     this.joinControl = joinControl;
   }
 
+  private static boolean validateTraits(RelTraitSet traitSet, RelNode left, RelNode right) {
+    ImmutableBitSet bitSet = ImmutableBitSet.range(left.getRowType().getFieldCount(),
+            left.getRowType().getFieldCount() + right.getRowType().getFieldCount());
+    for (RelTrait trait: traitSet) {
+      if (trait.getTraitDef().getTraitClass().equals(RelCollation.class)) {
+        RelCollation collationTrait = (RelCollation)trait;
+        for (RelFieldCollation field : collationTrait.getFieldCollations()) {
+          if (bitSet.indexOf(field.getFieldIndex()) > 0) {
+            return false;
+          }
+        }
+      } else if (trait.getTraitDef().getTraitClass().equals(DrillDistributionTrait.class)) {
+        DrillDistributionTrait distributionTrait = (DrillDistributionTrait) trait;
+        for (DrillDistributionTrait.DistributionField field : distributionTrait.getFields()) {
+          if (bitSet.indexOf(field.getFieldId()) > 0) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
   @Override
   public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
     try {
       return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType, this.swapped, this.runtimeFilterDef,
-          this.isRowKeyJoin, this.joinControl);
+          this.isRowKeyJoin, this.joinControl, this.isSemiJoin);
     }catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
@@ -87,7 +126,7 @@ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
   }
 
   @Override
-  public org.apache.drill.exec.physical.base.PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     // Depending on whether the left/right is swapped for hash inner join, pass in different
     // combinations of parameters.
     if (! swapped) {
@@ -150,4 +189,8 @@ public boolean isRowKeyJoin() {
     return this.isRowKeyJoin;
   }
 
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("semi-join: ", isSemiJoin);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index d07cf51d3c1..0d7f5caa483 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillSemiJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
@@ -30,10 +32,14 @@
 public class HashJoinPrule extends JoinPruleBase {
   public static final RelOptRule DIST_INSTANCE = new HashJoinPrule("Prel.HashJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), true);
   public static final RelOptRule BROADCAST_INSTANCE = new HashJoinPrule("Prel.HashJoinBroadcastPrule", RelOptHelper.any(DrillJoinRel.class), false);
+  public static final RelOptRule SEMI_DIST_INSTANCE = new HashJoinPrule("Prel.HashSemiJoinDistPrule", RelOptHelper.any(DrillSemiJoinRel.class), true);
+  public static final RelOptRule SEMI_BROADCAST_INSTANCE = new HashJoinPrule("Prel.HashSemiJoinBroadcastPrule", RelOptHelper.any(DrillSemiJoinRel.class), false);
+
 
   protected static final Logger tracer = CalciteTrace.getPlannerTracer();
 
   private final boolean isDist;
+  private boolean isSemi = false;
   private HashJoinPrule(String name, RelOptRuleOperand operand, boolean isDist) {
     super(operand, name);
     this.isDist = isDist;
@@ -42,17 +48,18 @@ private HashJoinPrule(String name, RelOptRuleOperand operand, boolean isDist) {
   @Override
   public boolean matches(RelOptRuleCall call) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    isSemi = call.rel(0) instanceof DrillSemiJoinRel;
     return settings.isMemoryEstimationEnabled() || settings.isHashJoinEnabled();
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    if (!settings.isHashJoinEnabled()) {
+    if (!settings.isHashJoinEnabled() || isSemi && !settings.isSemiJoinEnabled()) {
       return;
     }
 
-    final DrillJoinRel join = call.rel(0);
+    final DrillJoin join = call.rel(0);
     final RelNode left = join.getLeft();
     final RelNode right = join.getRight();
 
@@ -66,11 +73,11 @@ public void onMatch(RelOptRuleCall call) {
 
       if(isDist){
         createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN,
-            left, right, null /* left collation */, null /* right collation */, hashSingleKey);
+            left, right, null /* left collation */, null /* right collation */, hashSingleKey, isSemi);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
           createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.HASH_JOIN,
-              left, right, null /* left collation */, null /* right collation */);
+              left, right, null /* left collation */, null /* right collation */, isSemi);
         }
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index c40eeaa6a09..2581fa66738 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -17,9 +17,14 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.calcite.rex.RexChecker;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Litmus;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
@@ -37,7 +42,6 @@
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
@@ -48,11 +52,18 @@
 public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinPrel.class);
 
+  protected final boolean isSemiJoin;
   protected JoinUtils.JoinCategory joincategory;
 
   public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
-      JoinRelType joinType) {
+                  JoinRelType joinType) {
+    this(cluster, traits, left, right, condition, joinType, false);
+  }
+
+  public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+      JoinRelType joinType, boolean isSemiJoin) {
     super(cluster, traits, left, right, condition, joinType);
+    this.isSemiJoin = isSemiJoin;
   }
 
   @Override
@@ -73,7 +84,12 @@ public RelNode getJoinInput(int offset, RelNode input) {
     assert uniqueFieldNames(input.getRowType());
     final List<String> fields = getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+    final List<String> outputFields;
+    if (fields.size() > offset) {
+      outputFields = fields.subList(offset, offset + inputFields.size());
+    } else {
+      outputFields = new ArrayList<>();
+    }
     if (!outputFields.equals(inputFields)) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
@@ -86,6 +102,9 @@ public RelNode getJoinInput(int offset, RelNode input) {
   }
 
   private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, List<String> outputFieldNames) {
+    if (outputFieldNames.size() == 0) {
+      return input;
+    }
     List<RexNode> exprs = Lists.newArrayList();
 
     for (RelDataTypeField field : inputFields) {
@@ -139,4 +158,62 @@ protected void buildJoinConditions(List<JoinCondition> conditions,
     }
   }
 
+  public boolean isSemiJoin() {
+    return isSemiJoin;
+  }
+
+  /* A Drill physical rel which is semi join will have output row type with fields from only
+     left side of the join. Calcite's join rel expects to have the output row type from
+     left and right side of the join. This function is overloaded to not throw exceptions for
+     a Drill semi join physical rel.
+   */
+  @Override public boolean isValid(Litmus litmus, Context context) {
+    if (!this.isSemiJoin && !super.isValid(litmus, context)) {
+      return false;
+    }
+    if (getRowType().getFieldCount()
+            != getSystemFieldList().size()
+            + left.getRowType().getFieldCount()
+            + (this.isSemiJoin ? 0 : right.getRowType().getFieldCount())) {
+      return litmus.fail("field count mismatch");
+    }
+    if (condition != null) {
+      if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
+        return litmus.fail("condition must be boolean: {}",
+                condition.getType());
+      }
+      // The input to the condition is a row type consisting of system
+      // fields, left fields, and right fields. Very similar to the
+      // output row type, except that fields have not yet been made due
+      // due to outer joins.
+      RexChecker checker =
+              new RexChecker(
+                      getCluster().getTypeFactory().builder()
+                              .addAll(getSystemFieldList())
+                              .addAll(getLeft().getRowType().getFieldList())
+                              .addAll(getRight().getRowType().getFieldList())
+                              .build(),
+                      context, litmus);
+      condition.accept(checker);
+      if (checker.getFailureCount() > 0) {
+        return litmus.fail(checker.getFailureCount()
+                + " failures in condition " + condition);
+      }
+    }
+    return litmus.succeed();
+  }
+
+  @Override public RelDataType deriveRowType() {
+    if (isSemiJoin) {
+      return SqlValidatorUtil.deriveJoinRowType(
+              left.getRowType(),
+              null,
+              this.joinType,
+              getCluster().getTypeFactory(),
+              null,
+              new ArrayList<>());
+    } else {
+      return super.deriveRowType();
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 7588e2c1393..36654016044 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -22,7 +22,7 @@
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
-import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -47,8 +47,8 @@ protected JoinPruleBase(RelOptRuleOperand operand, String description) {
     super(operand, description);
   }
 
-  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right,
-      PlannerSettings settings) {
+  protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode right,
+                                       PlannerSettings settings) {
     List<Integer> leftKeys = Lists.newArrayList();
     List<Integer> rightKeys = Lists.newArrayList();
     List<Boolean> filterNulls = Lists.newArrayList();
@@ -66,7 +66,7 @@ protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode ri
     return distFields;
   }
 
-  protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) {
+  protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoin join, RelNode left, RelNode right) {
 
     double estimatedRightRowCount = RelMetadataQuery.instance().getRowCount(right);
     if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
@@ -78,10 +78,11 @@ protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel j
     return false;
   }
 
-  protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+  protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
-      RelCollation collationLeft, RelCollation collationRight, boolean hashSingleKey)throws InvalidRelException {
+      RelCollation collationLeft, RelCollation collationRight,
+      boolean hashSingleKey, boolean semiJoin)throws InvalidRelException {
 
     /* If join keys are  l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan:
      *   1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side.
@@ -93,10 +94,12 @@ protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
      *   Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
      */
 
-    DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
-    DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
+    DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+            ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
+    DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+            ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
 
-    createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+    createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
 
     assert (join.getLeftKeys().size() == join.getRightKeys().size());
 
@@ -110,7 +113,7 @@ protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
         hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1))));
         hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1))));
 
-        createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+        createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
       }
     }
   }
@@ -118,11 +121,11 @@ protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
   // Create join plan with both left and right children hash distributed. If the physical join type
   // is MergeJoin, a collation must be provided for both left and right child and the plan will contain
   // sort converter if necessary to provide the collation.
-  private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+  private void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
       RelCollation collationLeft, RelCollation collationRight,
-      DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition) throws InvalidRelException {
+      DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition, boolean isSemiJoin) throws InvalidRelException {
 
     RelTraitSet traitsLeft = null;
     RelTraitSet traitsRight = null;
@@ -145,7 +148,7 @@ private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
       final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call);
       newJoin = new HashJoinPrel(join.getCluster(), traitSet,
                                  convertedLeft, convertedRight, join.getCondition(),
-                                 join.getJoinType());
+                                 join.getJoinType(), isSemiJoin);
 
     } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
       newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft,
@@ -158,11 +161,11 @@ private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
   // Create join plan with left child ANY distributed and right child BROADCAST distributed. If the physical join type
   // is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter
   // if necessary to provide the collation.
-  protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoinRel join,
+  protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoin join,
       final RexNode joinCondition,
       final PhysicalJoinType physicalJoinType,
       final RelNode left, final RelNode right,
-      final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException {
+      final RelCollation collationLeft, final RelCollation collationRight, boolean semiJoin) throws InvalidRelException {
 
     DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
     RelTraitSet traitsRight = null;
@@ -184,10 +187,10 @@ protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoinRel
 
     if(traitProp){
       if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join, final RelNode rel) throws InvalidRelException {
             DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
 
@@ -200,24 +203,24 @@ public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws I
 
 
       } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join,  final RelNode rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join,  final RelNode rel) throws InvalidRelException {
             DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
             RelNode newLeft = convert(left, newTraitsLeft);
             return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
-                                         join.getJoinType());
+                                         join.getJoinType(), semiJoin);
 
           }
 
         }.go(join, convertedLeft);
       } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join,  final RelNode rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join,  final RelNode rel) throws InvalidRelException {
             DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
             RelNode newLeft = convert(left, newTraitsLeft);
@@ -235,7 +238,7 @@ public RelNode convertChild(final DrillJoinRel join,  final RelNode rel) throws
       } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
         final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
         call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, convertedLeft,
-            convertedRight, joinCondition, join.getJoinType()));
+            convertedRight, joinCondition, join.getJoinType(), semiJoin));
       } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
         call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
             convertedRight, joinCondition, join.getJoinType()));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index f06b66d2bc5..0bd25685dd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -70,11 +70,11 @@ public void onMatch(RelOptRuleCall call) {
       RelCollation collationRight = getCollation(join.getRightKeys());
 
       if(isDist){
-        createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
+        createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey, false);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
           createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.MERGE_JOIN,
-              left, right, collationLeft, collationRight);
+              left, right, collationLeft, collationRight, false);
         }
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
index 848c8a16f50..e7fc032af33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -21,6 +21,7 @@
 
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.calcite.rel.InvalidRelException;
@@ -45,8 +46,8 @@ private NestedLoopJoinPrule(String name, RelOptRuleOperand operand) {
   }
 
   @Override
-  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right,
-      PlannerSettings settings) {
+  protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode right,
+                                       PlannerSettings settings) {
     JoinRelType type = join.getJoinType();
 
     if (!(type == JoinRelType.INNER || type == JoinRelType.LEFT)) {
@@ -93,7 +94,7 @@ public void onMatch(RelOptRuleCall call) {
 
       if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
         createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.NESTEDLOOP_JOIN,
-            left, right, null /* left collation */, null /* right collation */);
+            left, right, null /* left collation */, null /* right collation */, false);
       }
 
     } catch (InvalidRelException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 63f884cfd65..7577cf91829 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -64,6 +64,8 @@
       new OptionDescription("Generates the topN plan for queries with the ORDER BY and LIMIT clauses."));
   public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin",
       new OptionDescription("Enable the memory hungry hash join. Drill assumes that a query will have adequate memory to complete and tries to use the fastest operations possible to complete the planned inner, left, right, or full outer joins using a hash table. Does not write to disk. Disabling hash join allows Drill to manage arbitrarily large data in a small memory footprint."));
+  public static final OptionValidator SEMIJOIN = new BooleanValidator("planner.enable_semijoin",
+          new OptionDescription("Enable the semi join optimization. Planner removes the distinct processing below the hash join and sets the semi join flag in hash join."));
   public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin",
       new OptionDescription("Sort-based operation. A merge join is used for inner join, left and right outer joins. Inputs to the merge join must be sorted. It reads the sorted input streams from both sides and finds matching rows. Writes to disk."));
   public static final OptionValidator NESTEDLOOPJOIN = new BooleanValidator("planner.enable_nestedloopjoin",
@@ -273,6 +275,10 @@ public boolean isHashJoinEnabled() {
     return options.getOption(HASHJOIN.getOptionName()).bool_val;
   }
 
+  public boolean isSemiJoinEnabled() {
+    return options.getOption(SEMIJOIN.getOptionName()).bool_val;
+  }
+
   public boolean isMergeJoinEnabled() {
     return options.getOption(MERGEJOIN.getOptionName()).bool_val;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
index b7bc4bb77bc..0fe0f92b018 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
@@ -68,7 +68,7 @@ public Prel visitJoin(JoinPrel prel, Double value) throws RuntimeException {
       // Mark left/right is swapped, when INNER hash join's left row count < ( 1+ margin factor) right row count.
       RelMetadataQuery mq = newJoin.getCluster().getMetadataQuery();
       if (newJoin.getLeft().estimateRowCount(mq) < (1 + value) * newJoin.getRight().estimateRowCount(mq) &&
-          newJoin.getJoinType() == JoinRelType.INNER) {
+          newJoin.getJoinType() == JoinRelType.INNER && !newJoin.isSemiJoin()) {
         ((HashJoinPrel) newJoin).setSwapped(true);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 23f35b5e17b..a33d8326489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -81,6 +81,7 @@
       new OptionDefinition(PlannerSettings.STREAMAGG),
       new OptionDefinition(PlannerSettings.TOPN, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
       new OptionDefinition(PlannerSettings.HASHJOIN),
+      new OptionDefinition(PlannerSettings.SEMIJOIN),
       new OptionDefinition(PlannerSettings.MERGEJOIN),
       new OptionDefinition(PlannerSettings.NESTEDLOOPJOIN),
       new OptionDefinition(PlannerSettings.MULTIPHASE),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 76be050ba01..f083c6603c8 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -514,6 +514,7 @@ drill.exec.options: {
     planner.enable_hash_single_key: true,
     planner.enable_hashagg: true,
     planner.enable_hashjoin: true,
+    planner.enable_semijoin: false,
     planner.enable_hashjoin_swap: true,
     planner.enable_hep_opt: true,
     planner.enable_hep_partition_pruning: true,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
new file mode 100644
index 00000000000..a660fffee0f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class, OperatorTest.class})
+public class TestSemiJoin extends BaseTestQuery {
+  @Test
+  public void testInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` where employee_id in (select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testInClauseWithSemiJoinDisabled() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` where employee_id in (select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), false);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(!queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testSmallInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` " +
+            "where employee_id in (351, 352, 353, 451, 452, 453)";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(!queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testLargeInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` " +
+            "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 553, 651, 652, 653, 751, 752, 753, 851, 852, 853, 951, 952, 953)";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testStarWithInClauseToSemiJoin() throws Exception {
+    String sql = "select * from cp.`employee.json` where employee_id in (select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testMultiColumnInClauseWithSemiJoin() throws Exception {
+    String sql = "select * from cp.`employee.json` where (employee_id, full_name) in (select employee_id, full_name from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java b/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
new file mode 100644
index 00000000000..a44ec9fc92f
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
@@ -0,0 +1,52 @@
+package org.apache.drill.common.logical.data;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("semi-join")
+public class LogicalSemiJoin extends Join {
+
+  @JsonCreator
+  public LogicalSemiJoin(@JsonProperty("left") LogicalOperator left,
+                         @JsonProperty("right") LogicalOperator right,
+                         @JsonProperty("conditions") List<JoinCondition> conditions,
+                         @JsonProperty("type") JoinRelType type) {
+    super(left, right, conditions, type);
+  }
+
+
+  @Override
+  public Iterator<LogicalOperator> iterator() {
+    return Iterators.forArray(getLeft(), getRight());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitJoin(this, value);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services