You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by GitBox <gi...@apache.org> on 2019/01/15 03:06:04 UTC

[drill] Diff for: [GitHub] Ben-Zvi closed pull request #1606: Drill 6845: Semi-Hash-Join to skip incoming build duplicates, automatically stop skipping if too few

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 811c4791e82..729b5e4021f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -157,8 +157,17 @@ private ExecConstants() {
   public static final BooleanValidator HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING = new BooleanValidator(HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY, null);
   public static final String HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY = "exec.hashjoin.runtime_filter.max.waiting.time";
   public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null);
-
-
+  public static final String HASHJOIN_SEMI_SKIP_DUPLICATES_KEY = "exec.hashjoin.semi_skip_duplicates";
+  public static final BooleanValidator HASHJOIN_SEMI_SKIP_DUPLICATES_VALIDATOR = new BooleanValidator(HASHJOIN_SEMI_SKIP_DUPLICATES_KEY,
+    new OptionDescription("When TRUE, make Semi Hash Join check for incoming duplicated and skip those (use more cpu, less memory)"));
+  public static final String HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_KEY = "exec.hashjoin.semi_percent_duplicates_to_skip";
+  public static final IntegerValidator HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR = new IntegerValidator(HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_KEY,
+    0, 100,
+    new OptionDescription("Semi join to skip duplicates only if initial check finds duplicates in incoming as no less than this percentage"));
+  public static final String HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_KEY = "exec.hashjoin.min_batches_in_available_memory";
+  public static final IntegerValidator HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR = new IntegerValidator(HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_KEY,
+    1, Integer.MAX_VALUE,
+    new OptionDescription("Threshold: Start spilling if available memory is less than this number of batches (only for semi join skipping duplicates"));
 
   // Hash Aggregate Options
   public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 275cf16572d..69ea3f34a66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -66,7 +66,7 @@
  *    After all the build/inner data is read for this partition - if all its data is in memory, then
  *  a hash table and a helper are created, and later this data would be probed.
  *    If all this partition's build/inner data was spilled, then it begins to work as an outer
- *  partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentBatch,
+ *  partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentVectorContainer,
  *  currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
  *  </p>
  */
@@ -94,8 +94,8 @@
   // incoming batches, per each partition (these may be spilled at some point)
   private List<VectorContainer> tmpBatchesList;
   // A batch and HV vector to hold incoming rows - per each partition
-  private VectorContainer currentBatch; // The current (newest) batch
-  private IntVector currHVVector; // The HV vectors for the currentBatches
+  private VectorContainer currentVectorContainer; // The current (newest) container
+  private IntVector currHVVector; // The HV vectors for the currentVectorContainers
 
   /* Helper class
    * Maintains linked list of build side records with the same key
@@ -126,9 +126,10 @@
   private long numInMemoryRecords;
   private boolean updatedRecordsPerBatch = false;
   private boolean semiJoin;
+  private boolean skipDuplicates; // only for semi
 
   public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
-                       RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin,
+                       RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin, boolean skipDuplicates,
                        int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum, int numPartitions) {
     this.allocator = allocator;
     this.buildBatch = buildBatch;
@@ -139,6 +140,7 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained
     this.cycleNum = cycleNum;
     this.numPartitions = numPartitions;
     this.semiJoin = semiJoin;
+    this.skipDuplicates = semiJoin && skipDuplicates;
 
     try {
       this.hashTable = baseHashTable.createAndSetupHashTable(null);
@@ -156,7 +158,7 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained
     this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
     tmpBatchesList = new ArrayList<>();
     if ( numPartitions > 1 ) {
-      allocateNewCurrentBatchAndHV();
+      allocateNewCurrentVectorContainerAndHV();
     }
   }
 
@@ -218,19 +220,47 @@ private VectorContainer allocateNewVectorContainer(RecordBatch rb) {
   /**
    *  Allocate a new current Vector Container and current HV vector
    */
-  public void allocateNewCurrentBatchAndHV() {
+  public void allocateNewCurrentVectorContainerAndHV() {
     if (outerBatchAllocNotNeeded) { return; } // skip when the inner is whole in memory
-    currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
+    currentVectorContainer = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
     currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
     currHVVector.allocateNew(recordsPerBatch);
   }
 
+  /**
+   *  This method is only used for semi-join, when trying to skip incoming key duplicate rows
+   *  It adds the given row's key to the hash table, is needed, and returns true only if that
+   *  key already existed in the hash table
+   *
+   * @param buildContainer The container with the current row
+   * @param ind The index of the current row in the container
+   * @param hashCode The hash code for the key of this row
+   * @return True iff that key already exists in the hash table
+   */
+  public boolean insertKeyIntoHashTable(VectorContainer buildContainer, int ind, int hashCode) throws SchemaChangeException {
+    hashTable.updateIncoming(buildContainer, probeBatch );
+    final IndexPointer htIndex = new IndexPointer();
+    HashTable.PutStatus status;
+
+    try {
+      status = hashTable.put(ind, htIndex, hashCode, BATCH_SIZE);
+    } catch (RetryAfterSpillException RE) {
+      if ( numPartitions == 1 ) { // if cannot spill
+        throw new OutOfMemoryException(RE);
+      }
+      spillThisPartition(); // free some memory
+      return false;
+    }
+
+    return status == HashTable.PutStatus.KEY_PRESENT;
+  }
+
   /**
    *  Spills if needed
    */
-  public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
+  public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.HashJoinSpillControl calc) {
 
-    int pos = currentBatch.appendRow(buildContainer,ind);
+    int pos = currentVectorContainer.appendRow(buildContainer,ind);
     currHVVector.getMutator().set(pos - 1, hashCode);   // store the hash value in the new column
     if ( pos == recordsPerBatch ) {
       boolean needsSpill = isSpilled || calc.shouldSpill();
@@ -243,7 +273,7 @@ public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode
    *
    */
   public void appendOuterRow(int hashCode, int recordsProcessed) {
-    int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
+    int pos = currentVectorContainer.appendRow(probeBatch.getContainer(),recordsProcessed);
     currHVVector.getMutator().set(pos - 1, hashCode);   // store the hash value in the new column
     if ( pos == recordsPerBatch ) {
       completeAnOuterBatch(true);
@@ -262,27 +292,27 @@ public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
    * (that is, more rows are coming) - initialize with a new current batch for that partition
    * */
   private void completeABatch(boolean toInitialize, boolean needsSpill) {
-    if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
-      currentBatch.add(currHVVector);
-      currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      tmpBatchesList.add(currentBatch);
+    if ( currentVectorContainer.hasRecordCount() && currentVectorContainer.getRecordCount() > 0) {
+      currentVectorContainer.add(currHVVector);
+      currentVectorContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      tmpBatchesList.add(currentVectorContainer);
       partitionBatchesCount++;
 
-      long batchSize = new RecordBatchSizer(currentBatch).getActualSize();
-      inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
+      long batchSize = new RecordBatchSizer(currentVectorContainer).getActualSize();
+      inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentVectorContainer.getRecordCount(), batchSize));
 
       partitionInMemorySize += batchSize;
-      numInMemoryRecords += currentBatch.getRecordCount();
+      numInMemoryRecords += currentVectorContainer.getRecordCount();
     } else {
-      freeCurrentBatchAndHVVector();
+      freeCurrentVectorContainerAndHVVector();
     }
     if ( needsSpill ) { // spill this batch/partition and free its memory
       spillThisPartition();
     }
     if ( toInitialize ) { // allocate a new batch and HV vector
-      allocateNewCurrentBatchAndHV();
+      allocateNewCurrentVectorContainerAndHV();
     } else {
-      currentBatch = null;
+      currentVectorContainer = null;
       currHVVector = null;
     }
   }
@@ -327,6 +357,10 @@ public void spillThisPartition() {
     if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill
     logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
 
+    if ( skipDuplicates ) {
+      hashTable.reset();
+    } // deallocate and reinit the hash table in case of a semi skipping dupl
+
     // If this is the first spill for this partition, create an output stream
     if ( writer == null ) {
       final String side = processingOuter ? "outer" : "inner";
@@ -492,6 +526,31 @@ private void closeWriterInternal(boolean doDeleteFile) {
     partitionBatchesCount = 0;
   }
 
+  /**
+   * Stop skipping duplicates (when there are too few of them)
+   * thus not maintaining the hash table as new rows arrive
+   */
+  public void stopSkippingDuplicates() {
+    assert skipDuplicates;
+    hashTable.reset();
+    skipDuplicates = false;
+  }
+
+  /**
+   * Builds the containers only, not the hash table nor the helper
+   * To be used in case of skipping duplicates (when the hash table already exists)
+   */
+  public void buildContainers() {
+    assert skipDuplicates;
+    if ( isSpilled ) { return; } // no building for spilled partitions
+    containers = new ArrayList<>();
+    for (int curr = 0; curr < partitionBatchesCount; curr++) {
+      VectorContainer nextBatch = tmpBatchesList.get(curr);
+      containers.add(nextBatch);
+    }
+    outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need for an outer batch
+  }
+
   /**
    * Creates the hash table and join helper for this partition.
    * This method should only be called after all the build side records
@@ -514,7 +573,7 @@ public void buildContainersHashTableAndHelper() throws SchemaChangeException {
       assert nextBatch != null;
       assert probeBatch != null;
 
-      hashTable.updateIncoming(nextBatch, probeBatch );
+      hashTable.updateIncoming(nextBatch, probeBatch);
 
       IntVector HV_vector = (IntVector) nextBatch.getLast();
 
@@ -531,7 +590,6 @@ public void buildContainersHashTableAndHelper() throws SchemaChangeException {
          */
         if ( ! semiJoin ) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
       }
-
       containers.add(nextBatch);
     }
     outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need for an outer batch
@@ -555,10 +613,10 @@ private void clearHashTableAndHelper() {
     }
   }
 
-  private void freeCurrentBatchAndHVVector() {
-    if ( currentBatch != null ) {
-      currentBatch.clear();
-      currentBatch = null;
+  private void freeCurrentVectorContainerAndHVVector() {
+    if ( currentVectorContainer != null ) {
+      currentVectorContainer.clear();
+      currentVectorContainer = null;
     }
     if ( currHVVector != null ) {
       currHVVector.clear();
@@ -571,7 +629,7 @@ private void freeCurrentBatchAndHVVector() {
    * @param deleteFile - whether to delete the spill file or not
    */
   public void cleanup(boolean deleteFile) {
-    freeCurrentBatchAndHVVector();
+    freeCurrentVectorContainerAndHVVector();
     if (containers != null && !containers.isEmpty()) {
       for (VectorContainer vc : containers) {
         vc.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index fe329cc5e51..5ecf0447cc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -70,7 +70,7 @@ public HashTableConfig(
 
   @JsonCreator
   public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity,
-                         @JsonProperty("initialCapacity") boolean initialSizeIsFinal,
+                         @JsonProperty("initialSizeIsFinal") boolean initialSizeIsFinal,
                          @JsonProperty("loadFactor") float loadFactor,
                          @JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
                          @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0ac0809d8f7..2761fff4f1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -214,6 +214,12 @@
   private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
   private List<BloomFilter> bloomFilters = new ArrayList<>();
 
+  private long semiCountTotal;
+  private long semiCountDuplicates;
+  private long semiDupDecisionPoint; // The number of incoming at which to stop and make the "continue skip" decision
+  private boolean semiSkipDuplicates; // optional, for semi join
+  private int semiSkipDuplicatesMinPercentage;
+
   /**
    * This holds information about the spilled partitions for the build and probe side.
    */
@@ -320,7 +326,7 @@ public boolean hasPartitionLimit() {
     AVG_OUTPUT_ROW_BYTES,
     OUTPUT_RECORD_COUNT;
 
-    // duplicate for hash ag
+    // duplicate for hash agg
 
     @Override
     public int metricId() { return ordinal(); }
@@ -719,7 +725,7 @@ private void setupHashTable() throws SchemaChangeException {
     }
 
     final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
-      true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
+      !semiSkipDuplicates, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
 
     // Create the chained hash table
     baseHashTable =
@@ -789,18 +795,16 @@ private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException
   }
 
   /**
-   *  Call only after num partitions is known
+   *  Call only after the final 'num partitions' is known (See partitionNumTuning())
    */
   private void delayedSetup() {
-    //
-    //  Find out the estimated max batch size, etc
-    //  and compute the max numPartitions possible
-    //  See partitionNumTuning()
-    //
-
     spilledState.initialize(numPartitions);
     // Create array for the partitions
     partitions = new HashPartition[numPartitions];
+    // Runtime stats for semi-join: Help decide early if seen too many duplicates (based on initial data, about 32K per partition)
+    semiCountTotal = semiCountDuplicates = 0;
+    semiDupDecisionPoint = // average each partition's hash table half full ( + 1 to avoid zero in case numPartitions == 1 )
+      ((numPartitions + 1) / 2) * context.getOptions().getLong(ExecConstants.MIN_HASH_TABLE_SIZE_KEY);
   }
 
   /**
@@ -810,8 +814,8 @@ private void initializeBuild() {
     baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
     // Recreate the partitions every time build is initialized
     for (int part = 0; part < numPartitions; part++ ) {
-      partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin,
-        RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), numPartitions);
+      partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin, semiSkipDuplicates, RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(),
+        numPartitions);
     }
 
     spilledInners = new HashJoinSpilledPartition[numPartitions];
@@ -939,16 +943,17 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
     }
 
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
+    HashJoinMemoryCalculator.BuildSidePartitioning currentCalc; // may be either a spill control calc, or buildCalc
 
     {
       // Initializing build calculator
       // Limit scope of these variables to this block
-      int maxBatchSize = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
+      int maxBatchRowCount = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
       boolean doMemoryCalculation = canSpill && !probeSideIsEmpty.booleanValue();
       HashJoinMemoryCalculator calc = getCalculatorImpl();
 
       calc.initialize(doMemoryCalculation);
-      buildCalc = calc.next();
+      currentCalc = buildCalc = calc.next();
 
       buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
         buildBatch,
@@ -959,14 +964,40 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
         numPartitions,
         RECORDS_PER_BATCH,
         RECORDS_PER_BATCH,
-        maxBatchSize,
-        maxBatchSize,
+        maxBatchRowCount,
+        maxBatchRowCount,
         batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
       if (spilledState.isFirstCycle() && doMemoryCalculation) {
         // Do auto tuning
-        buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
+        buildCalc = partitionNumTuning(maxBatchRowCount, buildCalc);
+      }
+      if ( semiSkipDuplicates ) {
+        // in case of a Semi Join skippinging duplicates, use a "spill control" calc
+        // (may revert back to the buildCalc if the code decides to stop skipping)
+        currentCalc = new HashJoinSpillControlImpl(allocator, RECORDS_PER_BATCH,
+          (int) context.getOptions().getOption(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR),
+          batchMemoryManager, context);
+
+        // calculates the max number of partitions possible
+        if ( spilledState.isFirstCycle() && doMemoryCalculation ) {
+          currentCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
+          buildBatch,
+          probeBatch,
+          buildJoinColumns,
+          probeSideIsEmpty.booleanValue(),
+          allocator.getLimit(),
+          numPartitions,
+          RECORDS_PER_BATCH,
+          RECORDS_PER_BATCH,
+          maxBatchRowCount,
+          maxBatchRowCount,
+          batchMemoryManager.getOutputBatchSize(),
+          HashTable.DEFAULT_LOAD_FACTOR);
+
+          numPartitions = currentCalc.getNumPartitions();
+        }
       }
     }
 
@@ -981,7 +1012,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
 
     // Make the calculator aware of our partitions
     final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
-    buildCalc.setPartitionStatSet(partitionStatSet);
+    currentCalc.setPartitionStatSet(partitionStatSet);
 
     boolean moreData = true;
     while (moreData) {
@@ -1029,12 +1060,31 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
             : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
           int currPart = hashCode & spilledState.getPartitionMask();
           hashCode >>>= spilledState.getBitsInMask();
-          // semi-join skips join-key-duplicate rows
-          if ( semiJoin ) {
-
+          // semi-join builds the hash-table, and skips join-key-duplicate rows
+          if (semiSkipDuplicates) {
+            semiCountTotal++;
+            boolean aDuplicate = partitions[currPart].insertKeyIntoHashTable(buildBatch.getContainer(), ind, hashCode);
+            // A heuristic: Make a decision once the threshold was met - either continue skipping duplicates, or stop
+            // (skipping duplicates carries a cost, so better avoid if duplicates are too few)
+            if ( semiCountTotal == semiDupDecisionPoint) {  // got enough incoming rows to decide ?
+              long threshold = semiCountTotal * semiSkipDuplicatesMinPercentage / 100;
+              if ( semiCountDuplicates < threshold ) { // when duplicates found were less than the percentage threshold, stop skipping
+                for (HashPartition partn : partitions) {
+                  partn.stopSkippingDuplicates();
+                }
+                semiSkipDuplicates = false;
+                currentCalc = buildCalc; // back to using the regular calc
+              }
+              logger.debug("Semi {} skipping duplicates after receiving {} rows with {} percent duplicates",
+                semiSkipDuplicates ? "to continue" : "stopped", semiCountTotal, (100 * semiCountDuplicates) / semiCountTotal);
+            }
+            if ( aDuplicate ) {
+              semiCountDuplicates++;
+              continue;
+            }
           }
           // Append the new inner row to the appropriate partition; spill (that partition) if needed
-          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
+          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, currentCalc); // may spill if needed
         }
 
         if ( read_right_HV_vector != null ) {
@@ -1054,12 +1104,15 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
       }
     }
 
+    int numPartitionsSpilled = 0;
+
     // Move the remaining current batches into their temp lists, or spill
     // them if the partition is spilled. Add the spilled partitions into
     // the spilled partitions list
     if ( numPartitions > 1 ) { // a single partition needs no completion
       for (HashPartition partn : partitions) {
         partn.completeAnInnerBatch(false, partn.isSpilled());
+        if ( partn.isSpilled() ) { numPartitionsSpilled++; }
       }
     }
 
@@ -1071,8 +1124,8 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
       return leftUpstream;
     }
 
-    HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
-    postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
+    HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = currentCalc.next();
+    postBuildCalc.initialize(probeSideIsEmpty.booleanValue());
 
     //
     //  Traverse all the in-memory partitions' incoming batches, and build their hash tables
@@ -1090,6 +1143,10 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
         if (postBuildCalc.shouldSpill()) {
           // Spill this partition if we need to make room
           partn.spillThisPartition();
+        } else if (semiSkipDuplicates) {
+          // All in memory, and already got the Hash Table - just build the containers
+          // (No additional memory is needed, hence no need for any new spill)
+          partn.buildContainers();
         } else {
           // Only build hash tables for partitions that are not spilled
           partn.buildContainersHashTableAndHelper();
@@ -1196,6 +1253,10 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
     this.probeBatch = left;
     joinType = popConfig.getJoinType();
     semiJoin = popConfig.isSemiJoin();
+    semiSkipDuplicatesMinPercentage = (int) context.getOptions().getOption(ExecConstants.HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR);
+    semiSkipDuplicates = semiJoin &&
+      semiSkipDuplicatesMinPercentage < 100 && // can't have 100 percent, at least the first key is not a duplicate
+      context.getOptions().getBoolean(ExecConstants.HASHJOIN_SEMI_SKIP_DUPLICATES_KEY);
     joinIsLeftOrFull  = joinType == JoinRelType.LEFT  || joinType == JoinRelType.FULL;
     joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
     conditions = popConfig.getConditions();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index c262e3c8c27..d6108c6ce3d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -95,7 +95,7 @@
    *   </li>
    * </ul>
    */
-  interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations> {
+  interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations>, HashJoinSpillControl {
     void initialize(boolean firstCycle,
                     boolean reserveHash,
                     RecordBatch buildSideBatch,
@@ -119,8 +119,6 @@ void initialize(boolean firstCycle,
 
     long getMaxReservedMemory();
 
-    boolean shouldSpill();
-
     String makeDebugString();
   }
 
@@ -128,17 +126,16 @@ void initialize(boolean firstCycle,
    * The interface representing the {@link HashJoinStateCalculator} corresponding to the
    * {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
    */
-  interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCalculator> {
+  interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCalculator>, HashJoinSpillControl {
     /**
      * Initializes the calculator with additional information needed.
      * @param probeEmty True if the probe is empty. False otherwise.
+     *
      */
     void initialize(boolean probeEmty);
 
     int getProbeRecordsPerBatch();
 
-    boolean shouldSpill();
-
     String makeDebugString();
   }
 
@@ -154,6 +151,9 @@ void initialize(boolean firstCycle,
     long getInMemorySize();
   }
 
+  interface HashJoinSpillControl {
+    boolean shouldSpill();
+  }
   /**
    * This class represents the memory size statistics for an entire set of partitions.
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 88f3ddc21de..dd1ede2e15a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -174,7 +174,7 @@ public HashJoinState getState() {
    *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int, double)}.
    *     This will initialize the StateCalculate with the additional information it needs.</li>
    *     <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li>
-   *     <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if spilling needs to occurr.</li>
+   *     <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()} To determine if spilling needs to occurr.</li>
    *     <li><b>Step 3:</b> Call {@link #next()} and get the next memory calculator associated with your next state.</li>
    *   </ul>
    * </p>
@@ -555,9 +555,9 @@ public String makeDebugString() {
    * <h1>Lifecycle</h1>
    * <p>
    *   <ul>
-   *     <li><b>Step 1:</b> Call {@link #initialize(boolean)}. This
+   *     <li><b>Step 1:</b> Call {@link PostBuildCalculations#initialize(boolean)}. This
    *     gives the {@link HashJoinStateCalculator} additional information it needs to compute memory requirements.</li>
-   *     <li><b>Step 2:</b> Call {@link #shouldSpill()}. This tells
+   *     <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()}. This tells
    *     you which build side partitions need to be spilled in order to make room for probing.</li>
    *     <li><b>Step 3:</b> Call {@link #next()}. After you are done probing
    *     and partitioning the probe side, get the next calculator.</li>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c549143d5f3..44e191d96d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -153,7 +153,7 @@ public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, J
     // for those outer partitions that need spilling (cause their matching inners spilled)
     // initialize those partitions' current batches and hash-value vectors
     for ( HashPartition partn : this.partitions) {
-      partn.allocateNewCurrentBatchAndHV();
+      partn.allocateNewCurrentVectorContainerAndHV();
     }
 
     currRightPartition = 0; // In case it's a Right/Full outer join
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
new file mode 100644
index 00000000000..d4f22ac8a8e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+  import org.apache.drill.exec.ExecConstants;
+  import org.apache.drill.exec.memory.BufferAllocator;
+  import org.apache.drill.exec.ops.FragmentContext;
+  import org.apache.drill.exec.record.RecordBatch;
+  import org.apache.drill.exec.record.RecordBatchMemoryManager;
+  import org.slf4j.Logger;
+  import org.slf4j.LoggerFactory;
+
+  import javax.annotation.Nullable;
+  import java.util.Set;
+
+  import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+  import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
+
+/**
+ * This class is currently used only for Semi-Hash-Join that avoids duplicates by the use of a hash table
+ * The method {@link HashJoinMemoryCalculator.HashJoinSpillControl#shouldSpill()} returns true if the memory available now to the allocator if not enough
+ * to hold (a multiple of, for safety) a new allocated batch
+ */
+public class HashJoinSpillControlImpl implements HashJoinMemoryCalculator.BuildSidePartitioning {
+  private static final Logger logger = LoggerFactory.getLogger(HashJoinSpillControlImpl.class);
+
+  private BufferAllocator allocator;
+  private int recordsPerBatch;
+  private int minBatchesInAvailableMemory;
+  private RecordBatchMemoryManager batchMemoryManager;
+  private int initialPartitions;
+  private int numPartitions;
+  private int recordsPerPartitionBatchProbe;
+  private FragmentContext context;
+  HashJoinMemoryCalculator.PartitionStatSet partitionStatSet;
+
+  HashJoinSpillControlImpl(BufferAllocator allocator, int recordsPerBatch, int minBatchesInAvailableMemory, RecordBatchMemoryManager batchMemoryManager, FragmentContext context) {
+    this.allocator = allocator;
+    this.recordsPerBatch = recordsPerBatch;
+    this.minBatchesInAvailableMemory = minBatchesInAvailableMemory;
+    this.batchMemoryManager = batchMemoryManager;
+    this.context = context;
+  }
+
+  @Override
+  public boolean shouldSpill() {
+    // Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
+    long batchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+    long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+    long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
+    boolean needsSpill = minBatchesInAvailableMemory * batchSize > memoryAvailableNow;
+    if ( needsSpill ) {
+      logger.debug("should spill now - batch size {}, mem avail {}, reserved for outgoing {}", batchSize, memoryAvailableNow, reserveForOutgoing);
+    }
+    return needsSpill;   // go spill if too little memory is available
+  }
+
+  @Override
+  public void initialize(boolean firstCycle,
+                         boolean reserveHash,
+                         RecordBatch buildSideBatch,
+                         RecordBatch probeSideBatch,
+                         Set<String> joinColumns,
+                         boolean probeEmpty,
+                         long memoryAvailable,
+                         int initialPartitions,
+                         int recordsPerPartitionBatchBuild,
+                         int recordsPerPartitionBatchProbe,
+                         int maxBatchNumRecordsBuild,
+                         int maxBatchNumRecordsProbe,
+                         int outputBatchSize,
+                         double loadFactor) {
+    this.initialPartitions = initialPartitions;
+    this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+
+    calculateMemoryUsage();
+  }
+
+  @Override
+  public void setPartitionStatSet(HashJoinMemoryCalculator.PartitionStatSet partitionStatSet) {
+    this.partitionStatSet = partitionStatSet;
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  /**
+   * Calculate the number of partitions possible for the given available memory
+   * start at initialPartitions and adjust down (in powers of 2) as needed
+   */
+  private void calculateMemoryUsage() {
+    long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+    long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
+
+    // Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
+    int buildBatchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+    int hashTableSize = buildBatchSize /* the keys in the HT */ +
+      4 * (int)context.getOptions().getLong(ExecConstants.MIN_HASH_TABLE_SIZE_KEY) /* the initial hash table buckets */ +
+      (2 + 2) * recordsPerBatch; /* the hash-values and the links */
+    int probeBatchSize = ( batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+
+    long memoryNeededPerPartition = Integer.max(buildBatchSize + hashTableSize, probeBatchSize);
+
+    for ( numPartitions = initialPartitions; numPartitions > 2; numPartitions /= 2 ) { // need at least 2
+      // each partition needs at least one internal build batch, and a minimum hash-table
+      // ( or an internal probe batch, for a spilled partition during probe phase )
+      // adding "min batches" to create some safety slack
+      if ( memoryAvailableNow >
+             ( numPartitions + minBatchesInAvailableMemory ) * memoryNeededPerPartition ) {
+        break; // got enough memory
+      }
+    }
+    logger.debug("Spill control chosen to use {} partitions", numPartitions);
+  }
+
+  @Override
+  public long getBuildReservedMemory() {
+    return 0;
+  }
+
+  @Override
+  public long getMaxReservedMemory() {
+    return 0;
+  }
+
+  @Override
+  public String makeDebugString() {
+    return "Spill Control " + HashJoinMemoryCalculatorImpl.HashJoinSpillControl.class.getCanonicalName();
+  }
+
+  @Nullable
+  @Override
+  public HashJoinMemoryCalculator.PostBuildCalculations next() {
+    return new SpillControlPostBuildCalculationsImpl(recordsPerPartitionBatchProbe,
+      allocator, recordsPerBatch, minBatchesInAvailableMemory, batchMemoryManager, partitionStatSet);
+  }
+
+  @Override
+  public HashJoinState getState() {
+    return HashJoinState.BUILD_SIDE_PARTITIONING;
+  }
+
+
+  /**
+   *   The purpose of this class is to provide the method {@link #shouldSpill} that ensures that enough memory is available to
+   *   hold all the probe incoming batches for those partitions that spilled (else need to spill more of them, for more memory).
+   */
+  public static class SpillControlPostBuildCalculationsImpl implements HashJoinMemoryCalculator.PostBuildCalculations {
+    private static final Logger logger = LoggerFactory.getLogger(SpillControlPostBuildCalculationsImpl.class);
+
+    private final int recordsPerPartitionBatchProbe;
+    private BufferAllocator allocator;
+    private int recordsPerBatch;
+    private int minBatchesInAvailableMemory;
+    private RecordBatchMemoryManager batchMemoryManager;
+    private boolean probeEmpty;
+    private final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet;
+
+
+    public SpillControlPostBuildCalculationsImpl(final int recordsPerPartitionBatchProbe,
+                                                 BufferAllocator allocator, int recordsPerBatch, int minBatchesInAvailableMemory,
+                                                 RecordBatchMemoryManager batchMemoryManager,
+                                                 final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet) {
+      this.allocator = allocator;
+      this.recordsPerBatch = recordsPerBatch;
+      this.minBatchesInAvailableMemory = minBatchesInAvailableMemory;
+      this.batchMemoryManager = batchMemoryManager;
+      this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+      this.buildPartitionStatSet = buildPartitionStatSet;
+    }
+
+    @Override
+    public void initialize(boolean hasProbeData) {
+      this.probeEmpty = hasProbeData;
+    }
+
+
+    @Override
+    public int getProbeRecordsPerBatch() {
+      return recordsPerPartitionBatchProbe;
+    }
+
+    @Override
+    public boolean shouldSpill() {
+      int numPartitionsSpilled = buildPartitionStatSet.getNumSpilledPartitions();
+      if ( numPartitionsSpilled == 0 ) { return false; } // no extra memory is needed if all the build side is in memory
+      if ( probeEmpty ) { return false; } // no probe side data
+      // Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
+      long batchSize = ( batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+      long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+      long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
+      boolean needsSpill = (numPartitionsSpilled + minBatchesInAvailableMemory ) * batchSize > memoryAvailableNow;
+      if ( needsSpill ) {
+        logger.debug("Post build should spill now - batch size {}, mem avail {}, reserved for outgoing {}, num partn spilled {}", batchSize,
+          memoryAvailableNow, reserveForOutgoing, numPartitionsSpilled);
+      }
+      return needsSpill;   // go spill if too little memory is available
+    }
+
+    @Nullable
+    @Override
+    public HashJoinMemoryCalculator next() {
+      return null;
+    }
+
+    @Override
+    public HashJoinState getState() {
+      return HashJoinState.POST_BUILD_CALCULATIONS;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return "Spill Control " + HashJoinMemoryCalculatorImpl.HashJoinSpillControl.class.getCanonicalName() + " calculator.";
+    }
+  }
+
+
+
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index c97220cef98..aeba391c992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,13 +124,16 @@
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
-      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashJoin
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME),
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING),
+      new OptionDefinition(ExecConstants.HASHJOIN_SEMI_SKIP_DUPLICATES_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
       // ------------------------------------------- Index planning related options BEGIN --------------------------------------------------------------
       new OptionDefinition(PlannerSettings.USE_SIMPLE_OPTIMIZER),
       new OptionDefinition(PlannerSettings.INDEX_PLANNING),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index 3f8f8a4b4c9..fa8a0702ec0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -158,14 +158,21 @@ public DoubleValidator(String name, OptionDescription description) {
   }
 
   public static class IntegerValidator extends LongValidator {
+    private int iMin = Integer.MIN_VALUE;
+    private int iMax = Integer.MAX_VALUE;
     public IntegerValidator(String name, OptionDescription description) {
       super(name, description);
     }
+    public IntegerValidator(String name, int min, int max, OptionDescription description) {
+      super(name, description);
+      iMin = min;
+      iMax = max;
+    }
 
     @Override
     public void validate(final OptionValue v, final OptionMetaData metaData, final OptionSet manager) {
       super.validate(v, metaData, manager);
-      if (v.num_val > Integer.MAX_VALUE || v.num_val < Integer.MIN_VALUE) {
+      if (v.num_val > iMax || v.num_val < iMin) {
         throw UserException.validationError()
           .message(String.format("Option %s does not have a valid integer value", getOptionName()))
           .build(logger);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4a5f07549ea..3d25b80d326 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -490,6 +490,9 @@ drill.exec.options: {
     exec.hashjoin.bloom_filter.max.size: 33554432, #32 MB
     exec.hashjoin.runtime_filter.waiting.enable: true,
     exec.hashjoin.runtime_filter.max.waiting.time: 300, #400 ms
+    exec.hashjoin.semi_skip_duplicates: true,
+    exec.hashjoin.semi_percent_duplicates_to_skip: 20,
+    exec.hashjoin.min_batches_in_available_memory: 3,
     exec.hashagg.mem_limit: 0,
     exec.hashagg.min_batches_per_partition: 2,
     exec.hashagg.num_partitions: 32,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index 93475acf6d4..a4377cb43ca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -110,7 +110,7 @@ public void run(SpillSet spillSet,
           context.getAllocator(),
           baseHashTable,
           buildBatch,
-          probeBatch, false, 10,
+          probeBatch, false, false, 10,
           spillSet,
           0,
           0,
@@ -209,7 +209,7 @@ public void run(SpillSet spillSet,
           context.getAllocator(),
           baseHashTable,
           buildBatch,
-          probeBatch, false, 10,
+          probeBatch, false, false, 10,
           spillSet,
           0,
           0,


With regards,
Apache Git Services