You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/11/13 20:58:33 UTC

[GitHub] Ben-Zvi closed pull request #1522: Drill 6735: Implement Semi-Join for the Hash-Join operator

Ben-Zvi closed pull request #1522: Drill 6735: Implement Semi-Join for the Hash-Join operator
URL: https://github.com/apache/drill/pull/1522
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java
index a624f5c2bbd..628dde7f086 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java
@@ -33,16 +33,19 @@
 
   protected final JoinRelType joinType;
 
+  protected final boolean semiJoin;
+
   protected final LogicalExpression condition;
 
   protected final List<JoinCondition> conditions;
 
   public AbstractJoinPop(PhysicalOperator leftOp, PhysicalOperator rightOp,
-                         JoinRelType joinType, LogicalExpression joinCondition,
+                         JoinRelType joinType, boolean semiJoin, LogicalExpression joinCondition,
                          List<JoinCondition> joinConditions) {
     left = leftOp;
     right = rightOp;
     this.joinType = joinType;
+    this.semiJoin = semiJoin;
     condition = joinCondition;
     conditions = joinConditions;
   }
@@ -69,6 +72,8 @@ public JoinRelType getJoinType() {
     return joinType;
   }
 
+  public boolean isSemiJoin() { return semiJoin; }
+
   public LogicalExpression getCondition() { return condition; }
 
   public List<JoinCondition> getConditions() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 4df9c38c0a6..35c187c8125 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -54,10 +54,11 @@
   public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right,
                      @JsonProperty("conditions") List<JoinCondition> conditions,
                      @JsonProperty("joinType") JoinRelType joinType,
+                     @JsonProperty("semiJoin") boolean semiJoin,
                      @JsonProperty("runtimeFilterDef") RuntimeFilterDef runtimeFilterDef,
                      @JsonProperty("isRowKeyJoin") boolean isRowKeyJoin,
                      @JsonProperty("joinControl") int joinControl) {
-    super(left, right, joinType, null, conditions);
+    super(left, right, joinType, semiJoin,null, conditions);
     Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop");
     this.runtimeFilterDef = runtimeFilterDef;
     this.isRowKeyJoin = isRowKeyJoin;
@@ -65,6 +66,16 @@ public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("r
     this.joinControl = joinControl;
   }
 
+  @VisibleForTesting
+  public HashJoinPOP(PhysicalOperator left, PhysicalOperator right,
+                     List<JoinCondition> conditions,
+                     JoinRelType joinType,
+                     RuntimeFilterDef runtimeFilterDef,
+                     boolean isRowKeyJoin,
+                     int joinControl){
+    this(left, right, conditions, joinType, false, runtimeFilterDef, isRowKeyJoin, joinControl);
+  }
+
   @VisibleForTesting
   public HashJoinPOP(PhysicalOperator left, PhysicalOperator right,
                      List<JoinCondition> conditions,
@@ -84,7 +95,7 @@ public HashJoinPOP(PhysicalOperator left, PhysicalOperator right,
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
         Preconditions.checkArgument(children.size() == 2);
 
-        HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0), children.get(1), conditions, joinType, runtimeFilterDef,
+        HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0), children.get(1), conditions, joinType, semiJoin, runtimeFilterDef,
               isRowKeyJoin, joinControl);
         newHashJoin.setMaxAllocation(getMaxAllocation());
         newHashJoin.setSubScanForRowKeyJoin(this.getSubScanForRowKeyJoin());
@@ -116,7 +127,7 @@ public HashJoinPOP flipIfRight() {
         for (JoinCondition c : conditions) {
           flippedConditions.add(c.flip());
         }
-        return new HashJoinPOP(right, left, flippedConditions, JoinRelType.LEFT, runtimeFilterDef, isRowKeyJoin, joinControl);
+        return new HashJoinPOP(right, left, flippedConditions, JoinRelType.LEFT, semiJoin, runtimeFilterDef, isRowKeyJoin, joinControl);
       } else {
         return this;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index ff3e0b2eb74..9e00f7cc522 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -51,7 +51,7 @@ public LateralJoinPOP(
       @JsonProperty("joinType") JoinRelType joinType,
       @JsonProperty("implicitRIDColumn") String implicitRIDColumn,
       @JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
-    super(left, right, joinType, null, null);
+    super(left, right, joinType, false, null, null);
     Preconditions.checkArgument(joinType != JoinRelType.FULL,
       "Full outer join is currently not supported with Lateral Join");
     Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 51ebb51a576..eb1a31a113c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -41,7 +41,7 @@ public MergeJoinPOP(
       @JsonProperty("conditions") List<JoinCondition> conditions,
       @JsonProperty("joinType") JoinRelType joinType
   ) {
-    super(left, right, joinType, null, conditions);
+    super(left, right, joinType, false, null, conditions);
     Preconditions.checkArgument(joinType != null, "Join type is missing!");
     Preconditions.checkArgument(joinType != JoinRelType.FULL,
       "Full outer join not currently supported with Merge Join");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
index 79e33e69ccb..5783b4ee181 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
@@ -40,7 +40,7 @@ public NestedLoopJoinPOP(
       @JsonProperty("joinType") JoinRelType joinType,
       @JsonProperty("condition") LogicalExpression condition
   ) {
-    super(left, right, joinType, condition, null);
+    super(left, right, joinType, false, condition, null);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 86b870d704f..275cf16572d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -125,9 +125,10 @@
   private long partitionInMemorySize;
   private long numInMemoryRecords;
   private boolean updatedRecordsPerBatch = false;
+  private boolean semiJoin;
 
   public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
-                       RecordBatch buildBatch, RecordBatch probeBatch,
+                       RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin,
                        int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum, int numPartitions) {
     this.allocator = allocator;
     this.buildBatch = buildBatch;
@@ -137,6 +138,7 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained
     this.partitionNum = partNum;
     this.cycleNum = cycleNum;
     this.numPartitions = numPartitions;
+    this.semiJoin = semiJoin;
 
     try {
       this.hashTable = baseHashTable.createAndSetupHashTable(null);
@@ -151,7 +153,7 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained
     } catch (SchemaChangeException sce) {
       throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
     }
-    this.hjHelper = new HashJoinHelper(context, allocator);
+    this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
     tmpBatchesList = new ArrayList<>();
     if ( numPartitions > 1 ) {
       allocateNewCurrentBatchAndHV();
@@ -391,7 +393,7 @@ public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeEx
     return Pair.of(compositeIndex, matchExists);
   }
   public int getNextIndex(int compositeIndex) {
-    // in case of iner rows with duplicate keys, get the next one
+    // in case of inner rows with duplicate keys, get the next one
     return hjHelper.getNextIndex(compositeIndex);
   }
   public boolean setRecordMatched(int compositeIndex) {
@@ -504,7 +506,7 @@ public void buildContainersHashTableAndHelper() throws SchemaChangeException {
       final int currentRecordCount = nextBatch.getRecordCount();
 
       // For every incoming build batch, we create a matching helper batch
-      hjHelper.addNewBatch(currentRecordCount);
+      if ( ! semiJoin ) { hjHelper.addNewBatch(currentRecordCount); }
 
       // Holder contains the global index where the key is hashed into using the hash table
       final IndexPointer htIndex = new IndexPointer();
@@ -527,7 +529,7 @@ public void buildContainersHashTableAndHelper() throws SchemaChangeException {
          * the current record index and batch index. This will be used
          * later when we probe and find a match.
          */
-        hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd);
+        if ( ! semiJoin ) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
       }
 
       containers.add(nextBatch);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2f17ff2c1ec..f1c61816584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -122,6 +122,7 @@
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
+  private boolean semiJoin;
   private boolean joinIsLeftOrFull;
   private boolean joinIsRightOrFull;
   private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
@@ -486,7 +487,7 @@ public HashJoinMemoryCalculator getCalculatorImpl() {
       final double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
       final String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
 
-      return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType);
+      return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
     } else {
       return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
     }
@@ -566,6 +567,7 @@ public IterOutcome innerNext() {
             hashJoinProbe.setupHashJoinProbe(probeBatch,
               this,
               joinType,
+              semiJoin,
               leftUpstream,
               partitions,
               spilledState.getCycle(),
@@ -777,7 +779,7 @@ private void initializeBuild() {
     baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
     // Recreate the partitions every time build is initialized
     for (int part = 0; part < numPartitions; part++ ) {
-      partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch,
+      partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin,
         RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), numPartitions);
     }
 
@@ -998,6 +1000,10 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
             : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
           int currPart = hashCode & spilledState.getPartitionMask();
           hashCode >>>= spilledState.getBitsInMask();
+          // semi-join skips join-key-duplicate rows
+          if ( semiJoin ) {
+
+          }
           // Append the new inner row to the appropriate partition; spill (that partition) if needed
           partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
         }
@@ -1093,7 +1099,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
 
   private void setupOutputContainerSchema() {
 
-    if (buildSchema != null) {
+    if (buildSchema != null && ! semiJoin ) {
       for (final MaterializedField field : buildSchema) {
         final MajorType inputType = field.getType();
         final MajorType outputType;
@@ -1160,6 +1166,7 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
     this.buildBatch = right;
     this.probeBatch = left;
     joinType = popConfig.getJoinType();
+    semiJoin = popConfig.isSemiJoin();
     joinIsLeftOrFull  = joinType == JoinRelType.LEFT  || joinType == JoinRelType.FULL;
     joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
     conditions = popConfig.getConditions();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperUnusedSizeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperUnusedSizeImpl.java
new file mode 100644
index 00000000000..25a985bf348
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperUnusedSizeImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+/**
+ * This calculator class is used when the Hash-Join_helper is not used (i.e., return size of zero)
+ */
+public class HashJoinHelperUnusedSizeImpl implements HashJoinHelperSizeCalculator {
+  public static final HashJoinHelperUnusedSizeImpl INSTANCE = new HashJoinHelperUnusedSizeImpl();
+
+  private HashJoinHelperUnusedSizeImpl() {
+    // Do nothing
+  }
+
+  @Override
+  public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat, double fragmentationFactor) {
+    Preconditions.checkArgument(!partitionStat.isSpilled());
+
+    return 0;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index e6635956bad..88f3ddc21de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -39,6 +39,7 @@
   private final double fragmentationFactor;
   private final double hashTableDoublingFactor;
   private final String hashTableCalculatorType;
+  private final boolean semiJoin;
 
   private boolean initialized = false;
   private boolean doMemoryCalculation;
@@ -46,11 +47,13 @@
   public HashJoinMemoryCalculatorImpl(final double safetyFactor,
                                       final double fragmentationFactor,
                                       final double hashTableDoublingFactor,
-                                      final String hashTableCalculatorType) {
+                                      final String hashTableCalculatorType,
+                                      boolean semiJoin) {
     this.safetyFactor = safetyFactor;
     this.fragmentationFactor = fragmentationFactor;
     this.hashTableDoublingFactor = hashTableDoublingFactor;
     this.hashTableCalculatorType = hashTableCalculatorType;
+    this.semiJoin = semiJoin;
   }
 
   public void initialize(boolean doMemoryCalculation) {
@@ -76,8 +79,8 @@ public BuildSidePartitioning next() {
       return new BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
         hashTableSizeCalculator,
-        HashJoinHelperSizeCalculatorImpl.INSTANCE,
-        fragmentationFactor, safetyFactor);
+        semiJoin ? HashJoinHelperUnusedSizeImpl.INSTANCE : HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        fragmentationFactor, safetyFactor, semiJoin);
     } else {
       return new NoopBuildSidePartitioningImpl();
     }
@@ -184,6 +187,7 @@ public HashJoinState getState() {
     private final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator;
     private final double fragmentationFactor;
     private final double safetyFactor;
+    private final boolean semiJoin;
 
     private int maxBatchNumRecordsBuild;
     private int maxBatchNumRecordsProbe;
@@ -217,12 +221,14 @@ public BuildSidePartitioningImpl(final BatchSizePredictor.Factory batchSizePredi
                                      final HashTableSizeCalculator hashTableSizeCalculator,
                                      final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
                                      final double fragmentationFactor,
-                                     final double safetyFactor) {
+                                     final double safetyFactor,
+                                     boolean semiJoin) {
       this.batchSizePredictorFactory = Preconditions.checkNotNull(batchSizePredictorFactory);
       this.hashTableSizeCalculator = Preconditions.checkNotNull(hashTableSizeCalculator);
       this.hashJoinHelperSizeCalculator = Preconditions.checkNotNull(hashJoinHelperSizeCalculator);
       this.fragmentationFactor = fragmentationFactor;
       this.safetyFactor = safetyFactor;
+      this.semiJoin = semiJoin;
     }
 
     @Override
@@ -470,7 +476,8 @@ public PostBuildCalculations next() {
         fragmentationFactor,
         safetyFactor,
         loadFactor,
-        reserveHash);
+        reserveHash,
+        semiJoin);
     }
 
     @Override
@@ -575,6 +582,7 @@ public String makeDebugString() {
     private final double safetyFactor;
     private final double loadFactor;
     private final boolean reserveHash;
+    private final boolean semiJoin;
 
     private boolean initialized;
     private long consumedMemory;
@@ -596,7 +604,8 @@ public PostBuildCalculationsImpl(final boolean firstCycle,
                                      final double fragmentationFactor,
                                      final double safetyFactor,
                                      final double loadFactor,
-                                     final boolean reserveHash) {
+                                     final boolean reserveHash,
+                                     boolean semiJoin) {
       this.firstCycle = firstCycle;
       this.probeSizePredictor = Preconditions.checkNotNull(probeSizePredictor);
       this.memoryAvailable = memoryAvailable;
@@ -609,6 +618,7 @@ public PostBuildCalculationsImpl(final boolean firstCycle,
       this.safetyFactor = safetyFactor;
       this.loadFactor = loadFactor;
       this.reserveHash = reserveHash;
+      this.semiJoin = semiJoin;
       this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
       this.computedProbeRecordsPerBatch = recordsPerPartitionBatchProbe;
     }
@@ -774,7 +784,7 @@ public HashJoinMemoryCalculator next() {
 
       // Some of our probe side batches were spilled so we have to recursively process the partitions.
       return new HashJoinMemoryCalculatorImpl(
-        safetyFactor, fragmentationFactor, hashTableSizeCalculator.getDoublingFactor(), hashTableSizeCalculator.getType());
+        safetyFactor, fragmentationFactor, hashTableSizeCalculator.getDoublingFactor(), hashTableSizeCalculator.getType(), semiJoin);
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index beddfa68faa..490eba4e33d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -39,7 +39,10 @@
     PROBE_PROJECT, PROJECT_RIGHT, DONE
   }
 
-  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition);
+  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
+                          RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+                          VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
+                          boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition);
   int  probeAndProject() throws SchemaChangeException;
   void changeToFinalProbeState();
   void setTargetOutputCount(int targetOutputCount);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 57b2d5bed31..c549143d5f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -88,8 +88,9 @@
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
   private int partitionMask = 0; // numPartitions - 1
   private int bitsInMask = 0; // number of bits in the MASK
-  private int rightHVColPosition;
+  private int numberOfBuildSideColumns;
   private int targetOutputRecords;
+  private boolean semiJoin;
 
   @Override
   public void setTargetOutputCount(int targetOutputRecords) {
@@ -106,6 +107,7 @@ public int getOutputCount() {
    * @param probeBatch
    * @param outgoing
    * @param joinRelType
+   * @param semiJoin
    * @param leftStartState
    * @param partitions
    * @param cycleNum
@@ -116,7 +118,10 @@ public int getOutputCount() {
    * @param rightHVColPosition
    */
   @Override
-  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
+  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, boolean semiJoin,
+                                 IterOutcome leftStartState, HashPartition[] partitions, int cycleNum,
+                                 VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
+                                 boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
     this.container = container;
     this.spilledInners = spilledInners;
     this.probeBatch = probeBatch;
@@ -127,7 +132,8 @@ public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, J
     this.cycleNum = cycleNum;
     this.buildSideIsEmpty = buildSideIsEmpty;
     this.numPartitions = numPartitions;
-    this.rightHVColPosition = rightHVColPosition;
+    this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; // position (0 based) of added column == #columns
+    this.semiJoin = semiJoin;
 
     partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
     bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
@@ -165,35 +171,31 @@ public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, J
    * Append the given build side row into the outgoing container
    * @param buildSrcContainer The container for the right/inner side
    * @param buildSrcIndex build side index
-   * @return The index for the last column (where the probe side would continue copying)
    */
-  private int appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) {
-    // "- 1" to skip the last "hash values" added column
-    int lastColIndex = buildSrcContainer.getNumberOfColumns() - 1;
-    for (int vectorIndex = 0; vectorIndex < lastColIndex; vectorIndex++) {
+  private void appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) {
+    for (int vectorIndex = 0; vectorIndex < numberOfBuildSideColumns; vectorIndex++) {
       ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
       ValueVector srcVector = buildSrcContainer.getValueVector(vectorIndex).getValueVector();
       destVector.copyEntry(container.getRecordCount(), srcVector, buildSrcIndex);
     }
-    return lastColIndex;
   }
   /**
    * Append the given probe side row into the outgoing container, following the build side part
    * @param probeSrcContainer The container for the left/outer side
    * @param probeSrcIndex probe side index
-   * @param baseIndex The column index to start copying into (following the build columns)
    */
-  private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, int baseIndex) {
-    for (int vectorIndex = baseIndex; vectorIndex < container.getNumberOfColumns(); vectorIndex++) {
+  private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex) {
+    for (int vectorIndex = numberOfBuildSideColumns; vectorIndex < container.getNumberOfColumns(); vectorIndex++) {
       ValueVector destVector = container.getValueVector(vectorIndex).getValueVector();
-      ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex - baseIndex).getValueVector();
+      ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex - numberOfBuildSideColumns).getValueVector();
       destVector.copyEntry(container.getRecordCount(), srcVector, probeSrcIndex);
     }
   }
   /**
    *  A special version of the VectorContainer's appendRow for the HashJoin; (following a probe) it
    *  copies the build and probe sides into the outgoing container. (It uses a composite
-   *  index for the build side)
+   *  index for the build side). If any of the build/probe source containers is null, then that side
+   *  is not appended (effectively outputing nulls for that side's columns).
    * @param buildSrcContainers The containers list for the right/inner side
    * @param compositeBuildSrcIndex Composite build index
    * @param probeSrcContainer The single container for the left/outer side
@@ -202,29 +204,20 @@ private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, i
    */
   private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
                         VectorContainer probeSrcContainer, int probeSrcIndex) {
-    int buildBatchIndex = compositeBuildSrcIndex >>> 16;
-    int buildOffset = compositeBuildSrcIndex & 65535;
-    int baseInd = 0;
-    if ( buildSrcContainers != null ) { baseInd = appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset); }
-    if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex, baseInd); }
+
+    if ( buildSrcContainers != null ) {
+      int buildBatchIndex = compositeBuildSrcIndex >>> 16;
+      int buildOffset = compositeBuildSrcIndex & 65535;
+      appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset);
+    }
+    if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex); }
     return container.incRecordCount();
   }
 
   /**
-   * A customised version of the VectorContainer's appendRow for HashJoin - used for Left
-   * Outer Join when there is no build side match - hence need a base index in
-   * this container's wrappers from where to start appending
-   * @param probeSrcContainer
-   * @param probeSrcIndex
-   * @param baseInd - index of this container's wrapper to start at
-   * @return
+   * After the "inner" probe phase, finish up a Right (of Full) Join by projecting the unmatched rows of the build side
+   * @param currBuildPart Which partition
    */
-  private int outputOuterRow(VectorContainer probeSrcContainer, int probeSrcIndex, int baseInd) {
-    appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
-    return container.incRecordCount();
-  }
-
-
   private void executeProjectRightPhase(int currBuildPart) {
     while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) {
       outputRecords =
@@ -319,6 +312,16 @@ private void executeProbePhase() throws SchemaChangeException {
 
         }
 
+        if ( semiJoin ) {
+          if ( probeIndex != -1 ) {
+            // output the probe side only
+            outputRecords =
+              outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+          }
+          recordsProcessed++;
+          continue; // no build-side duplicates, go on to the next probe-side row
+        }
+
         if (probeIndex != -1) {
 
           /* The current probe record has a key that matches. Get the index
@@ -366,8 +369,8 @@ private void executeProbePhase() throws SchemaChangeException {
           // If we have a left outer join, project the outer side
           if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
 
-            outputRecords =
-              outputOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition);
+            outputRecords = // output only the probe side (the build side would be all nulls)
+              outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
           }
           recordsProcessed++;
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index b14488c9ca5..86a03b5acd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.volcano.AbstractConverter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 6480f3d3581..25ceccd65be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -165,7 +165,7 @@ private PhysicalOperator getHashJoinPop(PhysicalPlanCreator creator, RelNode lef
     buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys);
 
     RuntimeFilterDef runtimeFilterDef = this.getRuntimeFilterDef();
-    HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype, runtimeFilterDef, isRowKeyJoin, htControl);
+    HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype, isSemiJoin, runtimeFilterDef, isRowKeyJoin, htControl);
     return creator.addMetadata(this, hjoin);
   }
 
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f083c6603c8..5981f2d5501 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -514,7 +514,7 @@ drill.exec.options: {
     planner.enable_hash_single_key: true,
     planner.enable_hashagg: true,
     planner.enable_hashjoin: true,
-    planner.enable_semijoin: false,
+    planner.enable_semijoin: true,
     planner.enable_hashjoin_swap: true,
     planner.enable_hep_opt: true,
     planner.enable_hep_partition_pruning: true,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index e2f80d88e03..93475acf6d4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -110,8 +110,7 @@ public void run(SpillSet spillSet,
           context.getAllocator(),
           baseHashTable,
           buildBatch,
-          probeBatch,
-          10,
+          probeBatch, false, 10,
           spillSet,
           0,
           0,
@@ -210,8 +209,7 @@ public void run(SpillSet spillSet,
           context.getAllocator(),
           baseHashTable,
           buildBatch,
-          probeBatch,
-          10,
+          probeBatch, false, 10,
           spillSet,
           0,
           0,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index 8d0d36b9111..bff28a8ba36 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -45,7 +45,7 @@ private void testSimpleReserveMemoryCalculationNoHashHelper(final boolean firstC
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
     final long accountedProbeBatchSize = firstCycle? 0: 10;
@@ -90,7 +90,7 @@ public void testSimpleReserveMemoryCalculationHash() {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
@@ -134,7 +134,7 @@ public void testAdjustInitialPartitions() {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
@@ -179,7 +179,7 @@ public void testDontAdjustInitialPartitions() {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
@@ -225,7 +225,7 @@ public void testHasDataProbeEmpty() {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
@@ -258,7 +258,7 @@ public void testNoProbeDataForStats() {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
@@ -302,7 +302,7 @@ public void testProbeEmpty() {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
@@ -357,7 +357,7 @@ private void testNoRoomInMemoryForBatch1Helper(final boolean firstCycle) {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
@@ -407,7 +407,7 @@ public void testCompleteLifeCycle() {
         new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
-        safetyFactor);
+        safetyFactor, false);
 
     final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
index b9ae58d9a95..5f7a36a7e27 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -40,5 +40,8 @@ public void simpleCalculateSize() {
 
     long actual = HashJoinHelperSizeCalculatorImpl.INSTANCE.calculateSize(partitionStat, 1.0);
     Assert.assertEquals(expected, actual);
+
+    long shouldBeZero = HashJoinHelperUnusedSizeImpl.INSTANCE.calculateSize(partitionStat, 1.0);
+    Assert.assertEquals(0, shouldBeZero);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
index 89c56b11197..0636cb7bffb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -137,7 +137,7 @@ public void testHasProbeDataButProbeEmpty() {
         fragmentationFactor, // fragmentationFactor
         safetyFactor, // safetyFactor
         .75, // loadFactor
-        false); // reserveHash
+        false, false); // reserveHash
 
     calc.initialize(true);
   }
@@ -181,7 +181,7 @@ public void testProbeEmpty() {
         fragmentationFactor,
         safetyFactor,
         .75,
-        true);
+        true, false);
 
     calc.initialize(true);
 
@@ -241,7 +241,7 @@ private void testHasNoProbeDataButProbeNonEmptyHelper(final boolean firstCycle)
         fragmentationFactor,
         safetyFactor,
         .75,
-        false);
+        false, false);
 
     calc.initialize(false);
 
@@ -307,7 +307,7 @@ private void testProbingAndPartitioningBuildAllInMemoryNoSpillHelper(final boole
         fragmentationFactor,
         safetyFactor,
         .75,
-        false);
+        false, false);
 
     calc.initialize(false);
 
@@ -373,7 +373,7 @@ private void testProbingAndPartitioningBuildAllInMemorySpillHelper(final boolean
         fragmentationFactor,
         safetyFactor,
         .75,
-        false);
+        false, false);
 
     calc.initialize(false);
 
@@ -445,7 +445,7 @@ private void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHashHelper(fin
         fragmentationFactor,
         safetyFactor,
         .75,
-        true);
+        true, false);
 
     calc.initialize(false);
 
@@ -509,7 +509,7 @@ private void testProbingAndPartitioningBuildAllInMemoryWithSpillHelper(final boo
         fragmentationFactor,
         safetyFactor,
         .75,
-        false);
+        false, false);
 
     calc.initialize(false);
 
@@ -583,7 +583,7 @@ private void testProbingAndPartitioningBuildSomeInMemoryHelper(final boolean fir
         fragmentationFactor,
         safetyFactor,
         .75,
-        false);
+        false, false);
 
     calc.initialize(false);
 
@@ -650,7 +650,7 @@ private void testProbingAndPartitioningBuildNoneInMemoryHelper(final boolean fir
         fragmentationFactor,
         safetyFactor,
         .75,
-        false);
+        false, false);
 
     calc.initialize(false);
     Assert.assertFalse(calc.shouldSpill());
@@ -705,7 +705,7 @@ public void testMakeDebugString()
         fragmentationFactor,
         safetyFactor,
         .75,
-        false);
+        false, false);
 
     calc.initialize(false);
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services