You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/10/19 19:08:16 UTC

[GitHub] ilooner closed pull request #1465: DRILL-6719: Separate spilling queue logic from HashJoin and HashAgg.

ilooner closed pull request #1465: DRILL-6719: Separate spilling queue logic from HashJoin and HashAgg.
URL: https://github.com/apache/drill/pull/1465
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index f6dd3da425e..6709cf6ddef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -48,12 +48,14 @@
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
 
+import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.record.RecordBatchSizer;
 
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
@@ -80,6 +82,7 @@
 import org.apache.drill.exec.vector.ValueVector;
 
 import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
 import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
@@ -95,9 +98,6 @@
   private static final boolean EXTRA_DEBUG_SPILL = false;
 
   // Fields needed for partitioning (the groups into partitions)
-  private int numPartitions = 0; // must be 2 to the power of bitsInMask (set in setup())
-  private int partitionMask; // numPartitions - 1
-  private int bitsInMask; // number of bits in the MASK
   private int nextPartitionToReturn = 0; // which partition to return the next batch from
   // The following members are used for logging, metrics, etc.
   private int rowsInPartition = 0; // counts #rows in each partition
@@ -148,25 +148,15 @@
   private int outBatchIndex[];
 
   // For handling spilling
+  private HashAggUpdater updater = new HashAggUpdater();
+  private SpilledState<HashAggSpilledPartition> spilledState = new SpilledState<>();
   private SpillSet spillSet;
   SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
   private Writer writers[]; // a vector writer for each spilled partition
   private int spilledBatchesCount[]; // count number of batches spilled, in each partition
   private String spillFiles[];
-  private int cycleNum = 0; // primary, secondary, tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
 
-  private static class SpilledPartition {
-    public int spilledBatches;
-    public String spillFile;
-    int cycleNum;
-    int origPartn;
-    int prevOrigPartn;
-  }
-
-  private ArrayList<SpilledPartition> spilledPartitionsList;
-  private int operatorId; // for the spill file name
-
   private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
   private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
   private TypedFieldId[] groupByOutFieldIds;
@@ -180,6 +170,59 @@
   private OperatorStats stats = null;
   private HashTableStats htStats = new HashTableStats();
 
+  public static class HashAggSpilledPartition extends AbstractSpilledPartitionMetadata {
+    private final int spilledBatches;
+    private final String spillFile;
+
+    public HashAggSpilledPartition(final int cycle,
+                                   final int originPartition,
+                                   final int prevOriginPartition,
+                                   final int spilledBatches,
+                                   final String spillFile) {
+      super(cycle, originPartition, prevOriginPartition);
+
+      this.spilledBatches = spilledBatches;
+      this.spillFile = Preconditions.checkNotNull(spillFile);
+    }
+
+    public int getSpilledBatches() {
+      return spilledBatches;
+    }
+
+    public String getSpillFile() {
+      return spillFile;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return String.format("Start reading spilled partition %d (prev %d) from cycle %d.",
+        this.getOriginPartition(), this.getPrevOriginPartition(), this.getCycle());
+    }
+  }
+
+  public class HashAggUpdater implements SpilledState.Updater {
+
+    @Override
+    public void cleanup() {
+      this.cleanup();
+    }
+
+    @Override
+    public String getFailureMessage() {
+      return null;
+    }
+
+    @Override
+    public long getMemLimit() {
+      return allocator.getLimit();
+    }
+
+    @Override
+    public boolean hasPartitionLimit() {
+      return false;
+    }
+  }
+
   public enum Metric implements MetricDef {
 
     NUM_BUCKETS,
@@ -335,7 +378,6 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme
     this.incoming = incoming;
     this.outgoing = outgoing;
     this.outContainer = outContainer;
-    this.operatorId = hashAggrConfig.getOperatorId();
     this.useMemoryPrediction = context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
 
     is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
@@ -404,7 +446,7 @@ private void delayedSetup() {
     final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
 
     // Set the number of partitions from the configuration (raise to a power of two, if needed)
-    numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+    int numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
     if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early return with 1 partition
       canSpill = false;
       logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
@@ -460,9 +502,8 @@ private void delayedSetup() {
       // (but 1st phase can still spill, so it will maintain the original memory limit)
       allocator.setLimit(AbstractBase.MAX_ALLOCATION);  // 10_000_000_000L
     }
-    // Based on the number of partitions: Set the mask and bit count
-    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
-    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+    spilledState.initialize(numPartitions);
 
     // Create arrays (one entry per partition)
     htables = new HashTable[numPartitions];
@@ -471,7 +512,6 @@ private void delayedSetup() {
     writers = new Writer[numPartitions];
     spilledBatchesCount = new int[numPartitions];
     spillFiles = new String[numPartitions];
-    spilledPartitionsList = new ArrayList<SpilledPartition>();
 
     plannedBatches = numPartitions; // each partition should allocate its first batch
 
@@ -511,7 +551,7 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti
     this.incoming = newIncoming;
     currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file
     nextPartitionToReturn = 0;
-    for (int i = 0; i < numPartitions; i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
       htables[i].updateIncoming(newIncoming.getContainer(), null);
       htables[i].reset();
       if ( batchHolders[i] != null) {
@@ -829,7 +869,7 @@ public int getOutputCount() {
 
   @Override
   public void adjustOutputCount(int outputBatchSize, int oldRowWidth, int newRowWidth) {
-    for (int i = 0; i < numPartitions; i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
       if (batchHolders[i] == null || batchHolders[i].size() == 0) {
         continue;
       }
@@ -851,7 +891,7 @@ public void cleanup() {
           (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
     // clean (and deallocate) each partition
-    for ( int i = 0; i < numPartitions; i++) {
+    for ( int i = 0; i < spilledState.getNumPartitions(); i++) {
           if (htables[i] != null) {
               htables[i].clear();
               htables[i] = null;
@@ -877,12 +917,12 @@ public void cleanup() {
           }
     }
     // delete any spill file left in unread spilled partitions
-    while ( ! spilledPartitionsList.isEmpty() ) {
-        SpilledPartition sp = spilledPartitionsList.remove(0);
+    while (!spilledState.isEmpty()) {
+        HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
         try {
-          spillSet.delete(sp.spillFile);
+          spillSet.delete(sp.getSpillFile());
         } catch(IOException e) {
-          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+          logger.warn("Cleanup: Failed to delete spill file {}",sp.getSpillFile());
         }
     }
     // Delete the currently handled (if any) spilled file
@@ -948,7 +988,7 @@ private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
     // first find the largest spilled partition
     int maxSizeSpilled = -1;
     int indexMaxSpilled = -1;
-    for (int isp = 0; isp < numPartitions; isp++ ) {
+    for (int isp = 0; isp < spilledState.getNumPartitions(); isp++ ) {
       if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
         maxSizeSpilled = batchHolders[isp].size();
         indexMaxSpilled = isp;
@@ -967,7 +1007,7 @@ private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
       indexMax = indexMaxSpilled;
       maxSize = 4 * maxSizeSpilled;
     }
-    for ( int insp = 0; insp < numPartitions; insp++) {
+    for ( int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
       if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
         indexMax = insp;
         maxSize = batchHolders[insp].size();
@@ -993,7 +1033,7 @@ private void spillAPartition(int part) {
     ArrayList<BatchHolder> currPartition = batchHolders[part];
     rowsInPartition = 0;
     if ( EXTRA_DEBUG_SPILL ) {
-      logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size());
+      logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, spilledState.getCycle(), currPartition.size());
     }
 
     if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
@@ -1001,7 +1041,7 @@ private void spillAPartition(int part) {
     // If this is the first spill for this partition, create an output stream
     if ( ! isSpilled(part) ) {
 
-      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null);
+      spillFiles[part] = spillSet.getNextSpillFile(spilledState.getCycle() > 0 ? Integer.toString(spilledState.getCycle()) : null);
 
       try {
         writers[part] = spillSet.writer(spillFiles[part]);
@@ -1025,21 +1065,8 @@ private void spillAPartition(int part) {
       this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, numOutputRecords);
 
       // set the value count for outgoing batch value vectors
-      /* int i = 0; */
       for (VectorWrapper<?> v : outgoing) {
         v.getValueVector().getMutator().setValueCount(numOutputRecords);
-        /*
-        // print out the first row to be spilled ( varchar, varchar, bigint )
-        try {
-          if (i++ < 2) {
-            NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector());
-            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
-          } else {
-            NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector());
-            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
-          }
-        } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); }
-        */
       }
 
       outContainer.setRecordCount(numOutputRecords);
@@ -1119,19 +1146,20 @@ public AggIterOutcome outputCurrentBatch() {
     if ( ! earlyOutput ) {
       // Update the next partition to return (if needed)
       // skip fully returned (or spilled) partitions
-      while (nextPartitionToReturn < numPartitions) {
+      while (nextPartitionToReturn < spilledState.getNumPartitions()) {
         //
         // If this partition was spilled - spill the rest of it and skip it
         //
         if ( isSpilled(nextPartitionToReturn) ) {
           spillAPartition(nextPartitionToReturn); // spill the rest
-          SpilledPartition sp = new SpilledPartition();
-          sp.spillFile = spillFiles[nextPartitionToReturn];
-          sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
-          sp.cycleNum = cycleNum; // remember the current cycle
-          sp.origPartn = nextPartitionToReturn; // for debugging / filename
-          sp.prevOrigPartn = originalPartition; // for debugging / filename
-          spilledPartitionsList.add(sp);
+          HashAggSpilledPartition sp = new HashAggSpilledPartition(
+            spilledState.getCycle(),
+            nextPartitionToReturn,
+            originalPartition,
+            spilledBatchesCount[nextPartitionToReturn],
+            spillFiles[nextPartitionToReturn]);
+
+          spilledState.addPartition(sp);
 
           reinitPartition(nextPartitionToReturn); // free the memory
           try {
@@ -1155,9 +1183,9 @@ public AggIterOutcome outputCurrentBatch() {
       }
 
       // if passed the last partition - either done or need to restart and read spilled partitions
-      if (nextPartitionToReturn >= numPartitions) {
+      if (nextPartitionToReturn >= spilledState.getNumPartitions()) {
         // The following "if" is probably never used; due to a similar check at the end of this method
-        if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+        if (spilledState.isEmpty()) { // and no spilled partitions
           allFlushed = true;
           this.outcome = IterOutcome.NONE;
           if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
@@ -1170,10 +1198,10 @@ public AggIterOutcome outputCurrentBatch() {
         buildComplete = false; // go back and call doWork() again
         handlingSpills = true; // beginning to work on the spill files
         // pick a spilled partition; set a new incoming ...
-        SpilledPartition sp = spilledPartitionsList.remove(0);
+        HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
         // Create a new "incoming" out of the spilled partition spill file
-        newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, context, schema, oContext, spillSet);
-        originalPartition = sp.origPartn; // used for the filename
+        newIncoming = new SpilledRecordbatch(sp.getSpillFile(), sp.getSpilledBatches(), context, schema, oContext, spillSet);
+        originalPartition = sp.getOriginPartition(); // used for the filename
         logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
         // Initialize .... new incoming, new set of partitions
         try {
@@ -1181,22 +1209,8 @@ public AggIterOutcome outputCurrentBatch() {
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
-        // update the cycle num if needed
-        // The current cycle num should always be one larger than in the spilled partition
-        if ( cycleNum == sp.cycleNum ) {
-          cycleNum = 1 + sp.cycleNum;
-          stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
-          // report first spill or memory stressful situations
-          if ( cycleNum == 1 ) { logger.info("Started reading spilled records "); }
-          if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
-          if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
-          if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
-          if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
-        }
-        if ( EXTRA_DEBUG_SPILL ) {
-          logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {} batches). More {} spilled partitions left.",
-              sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size());
-        }
+
+        spilledState.updateCycle(stats, sp, updater);
         return AggIterOutcome.AGG_RESTART;
       }
 
@@ -1265,7 +1279,7 @@ else if ( handleEmit ) {
         outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
         return AggIterOutcome.AGG_EMIT;
       }
-      else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ?
+      else if ((partitionToReturn + 1 == spilledState.getNumPartitions()) && spilledState.isEmpty()) { // last partition ?
 
         allFlushed = true; // next next() call will return NONE
 
@@ -1313,7 +1327,7 @@ private String getOOMErrorMsg(String prefix) {
     } else if (!canSpill) {  // 2nd phase, with only 1 partition
       errmsg = "Too little memory available to operator to facilitate spilling.";
     } else { // a bug ?
-      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
+      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + spilledState.getNumPartitions() +
       ". Estimated batch size: " + estMaxBatchSize + ". values size: " + estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
       if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + plannedBatches; }
       if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
@@ -1334,32 +1348,6 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
     assert incomingRowIdx >= 0;
     assert ! earlyOutput;
 
-    /** for debugging
-     Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
-     BigIntVector vv0 = null;
-     BigIntHolder holder = null;
-
-     if (tmp != null) {
-     vv0 = ((BigIntVector) tmp);
-     holder = new BigIntHolder();
-     holder.value = vv0.getAccessor().get(incomingRowIdx) ;
-     }
-     */
-    /*
-    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
-      // for debugging -- show the first row from a spilled batch
-      Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
-      Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
-      Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
-
-      if (tmp0 != null && tmp1 != null && tmp2 != null) {
-        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
-        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
-        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
-        logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
-      }
-    }
-    */
     // The hash code is computed once, then its lower bits are used to determine the
     // partition to use, and the higher bits determine the location in the hash table.
     int hashCode;
@@ -1371,12 +1359,12 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
     }
 
     // right shift hash code for secondary (or tertiary...) spilling
-    for (int i = 0; i < cycleNum; i++) {
-      hashCode >>>= bitsInMask;
+    for (int i = 0; i < spilledState.getCycle(); i++) {
+      hashCode >>>= spilledState.getBitsInMask();
     }
 
-    int currentPartition = hashCode & partitionMask;
-    hashCode >>>= bitsInMask;
+    int currentPartition = hashCode & spilledState.getPartitionMask();
+    hashCode >>>= spilledState.getBitsInMask();
     HashTable.PutStatus putStatus = null;
     long allocatedBeforeHTput = allocator.getAllocatedMemory();
 
@@ -1498,7 +1486,7 @@ private void spillIfNeeded(int currentPartition, boolean forceSpill) {
       maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * (estMaxBatchSize + MAX_BATCH_ROW_COUNT * (4 + 4 /* links + hash-values */));
       // Add the (max) size of the current hash table, in case it will double
       int maxSize = 1;
-      for (int insp = 0; insp < numPartitions; insp++) {
+      for (int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
         maxSize = Math.max(maxSize, batchHolders[insp].size());
       }
       maxMemoryNeeded += MAX_BATCH_ROW_COUNT * 2 * 2 * 4 * maxSize; // 2 - double, 2 - max when %50 full, 4 - Uint4
@@ -1577,12 +1565,12 @@ private void spillIfNeeded(int currentPartition, boolean forceSpill) {
    * @param htables
    */
   private void updateStats(HashTable[] htables) {
-    if ( cycleNum > 0 ||  // These stats are only for before processing spilled files
+    if (!spilledState.isFirstCycle() ||  // These stats are only for before processing spilled files
       handleEmit ) { return; } // and no stats collecting when handling an EMIT
     long numSpilled = 0;
     HashTableStats newStats = new HashTableStats();
     // sum the stats from all the partitions
-    for (int ind = 0; ind < numPartitions; ind++) {
+    for (int ind = 0; ind < spilledState.getNumPartitions(); ind++) {
       htables[ind].getStats(newStats);
       htStats.addStats(newStats);
       if (isSpilled(ind)) {
@@ -1593,8 +1581,8 @@ private void updateStats(HashTable[] htables) {
     this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
-    this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no spill
+    this.stats.setLongStat(Metric.NUM_PARTITIONS, spilledState.getNumPartitions());
+    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
     if ( is2ndPhase ) {
       this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
new file mode 100644
index 00000000000..951fa668e46
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/AbstractSpilledPartitionMetadata.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+public abstract class AbstractSpilledPartitionMetadata implements SpilledPartitionMetadata {
+  private final int cycle;
+  private final int originPartition;
+  private final int prevOriginPartition;
+
+  public AbstractSpilledPartitionMetadata(final int cycle,
+                                          final int originPartition,
+                                          final int prevOriginPartition) {
+    this.cycle = cycle;
+    this.originPartition = originPartition;
+    this.prevOriginPartition = prevOriginPartition;
+  }
+
+  @Override
+  public int getCycle() {
+    return cycle;
+  }
+
+  @Override
+  public int getOriginPartition() {
+    return originPartition;
+  }
+
+  @Override
+  public int getPrevOriginPartition() {
+    return prevOriginPartition;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
new file mode 100644
index 00000000000..4d53be349bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledPartitionMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+/**
+ * This interface represents the metadata for a spilled partition.
+ */
+public interface SpilledPartitionMetadata {
+  /**
+   * The spill cycle for a partition.
+   * @return The spill cycle for a partition.
+   */
+  int getCycle();
+
+  /**
+   * The parent partition.
+   * @return The parent partition.
+   */
+  int getOriginPartition();
+
+  /**
+   * The parent of parent partition.
+   * @return The parent of parent partition.
+   */
+  int getPrevOriginPartition();
+  String makeDebugString();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
new file mode 100644
index 00000000000..a1762877349
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/SpilledState.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Manages the spilling information for an operator.
+ * @param <T> The class holding spilled partition metadata.
+ *
+ * <h4>Lifecycle</h4>
+ * <ol>
+ *   <li>Create a {@link SpilledState} instance.</li>
+ *   <li>Call {@link SpilledState#initialize(int)}</li>
+ *   <li>Call {@link SpilledState#addPartition(SpilledPartitionMetadata)} (SpilledPartitionMetadata)}, {@link SpilledState#getNextSpilledPartition()}, or
+ *           {@link SpilledState#updateCycle(OperatorStats, SpilledPartitionMetadata, Updater)}</li>
+ * </ol>
+ *
+ * <p>
+ *  <ul>
+ *   <li>A user can call {@link SpilledState#getCycle()} at any time.</li>
+ *  </ul>
+ * </p>
+ */
+public class SpilledState<T extends SpilledPartitionMetadata> {
+  public static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpilledState.class);
+
+  private int numPartitions;
+  private int partitionMask;
+  private int bitsInMask;
+
+  private int cycle = 0;
+  private Queue<T> queue = new LinkedList<>();
+  private boolean initialized = false;
+
+  public SpilledState() {
+  }
+
+  /**
+   * Initializes the number of partitions to use for the spilled state.
+   * @param numPartitions The number of partitions to use for the spilled state.
+   */
+  public void initialize(int numPartitions) {
+    Preconditions.checkState(!initialized);
+    Preconditions.checkArgument(numPartitions >= 1); // Numpartitions must be positive
+    Preconditions.checkArgument((numPartitions & (numPartitions - 1)) == 0); // Make sure it's a power of two
+
+    this.numPartitions = numPartitions;
+    initialized = true;
+    partitionMask = numPartitions - 1;
+    bitsInMask = Integer.bitCount(partitionMask);
+  }
+
+  /**
+   * Gets the number of partitions.
+   * @return The number of partitions.
+   */
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  /**
+   * True if this is the first cycle (0).
+   * @return True if this is the first cycle (0).
+   */
+  public boolean isFirstCycle() {
+    return cycle == 0;
+  }
+
+  public int getPartitionMask() {
+    return partitionMask;
+  }
+
+  public int getBitsInMask() {
+    return bitsInMask;
+  }
+
+  /**
+   * Add the partition metadata to the end of the queue to be processed.
+   * @param spilledPartition The partition metadata to process.
+   * @return
+   */
+  public boolean addPartition(T spilledPartition) {
+    Preconditions.checkState(initialized);
+    return queue.offer(spilledPartition);
+  }
+
+  /**
+   * Get the next spilled partition to process.
+   * @return The metadata for the next spilled partition to process.
+   */
+  public T getNextSpilledPartition() {
+    Preconditions.checkState(initialized);
+    return queue.poll();
+  }
+
+  /**
+   * True if there are no spilled partitions.
+   * @return True if there are no spilled partitions.
+   */
+  public boolean isEmpty() {
+    return queue.isEmpty();
+  }
+
+  /**
+   * Update the current spill cycle.
+   * @param operatorStats Current operator stats.
+   * @param spilledPartition The next spilled partition metadata to process.
+   * @param updater The updater implementation that executes custom logic if a spill cycle is incremented.
+   */
+  public void updateCycle(final OperatorStats operatorStats,
+                          final T spilledPartition,
+                          final Updater updater) {
+    Preconditions.checkState(initialized);
+    Preconditions.checkNotNull(operatorStats);
+    Preconditions.checkNotNull(spilledPartition);
+    Preconditions.checkNotNull(updater);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(spilledPartition.makeDebugString());
+    }
+
+    if (cycle == spilledPartition.getCycle()) {
+      // Update the cycle num if needed.
+      // The current cycle num should always be one larger than in the spilled partition.
+
+      cycle = 1 + spilledPartition.getCycle();
+      operatorStats.setLongStat(HashJoinBatch.Metric.SPILL_CYCLE, cycle); // update stats
+
+      if (logger.isDebugEnabled()) {
+        // report first spill or memory stressful situations
+        if (cycle == 1) {
+          logger.debug("Started reading spilled records ");
+        } else if (cycle == 2) {
+          logger.debug("SECONDARY SPILLING ");
+        } else if (cycle == 3) {
+          logger.debug("TERTIARY SPILLING ");
+        } else if (cycle == 4) {
+          logger.debug("QUATERNARY SPILLING ");
+        } else if (cycle == 5) {
+          logger.debug("QUINARY SPILLING ");
+        }
+      }
+
+      if (updater.hasPartitionLimit() && cycle * bitsInMask > 20) {
+        queue.offer(spilledPartition); // so cleanup() would delete the curr spill files
+        updater.cleanup();
+        throw UserException
+          .unsupportedError()
+          .message("%s.\n On cycle num %d mem available %d num partitions %d.", updater.getFailureMessage(), cycle, updater.getMemLimit(), numPartitions)
+          .build(logger);
+      }
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(spilledPartition.makeDebugString());
+    }
+  }
+
+  /**
+   * Gets the current spill cycle.
+   * @return The current spill cycle.
+   */
+  public int getCycle() {
+    return cycle;
+  }
+
+  /**
+   * This is a class that is used to do updates of the spilled state.
+   */
+  public interface Updater {
+    /**
+     * Does any necessary cleanup if we've spilled too much and abort the query.
+     */
+    void cleanup();
+
+    /**
+     * Gets the failure message in the event that we spilled to far.
+     * @return The failure message in the event that we spilled to far.
+     */
+    String getFailureMessage();
+
+    /**
+     * Gets the current memory limit.
+     * @return The current memory limit.
+     */
+    long getMemLimit();
+
+    /**
+     * True if there is a limit to the number of times we can partition.
+     * @return True if there is a limit to the number of times we can partition.
+     */
+    boolean hasPartitionLimit();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 3d456967f08..929811ce508 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -23,9 +23,8 @@
 import java.util.List;
 import java.util.Set;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.drill.common.exceptions.UserException;
@@ -56,12 +55,14 @@
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
+import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -80,6 +81,7 @@
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -137,8 +139,6 @@
    * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
-  private int partitionMask = 0; // numPartitions - 1
-  private int bitsInMask = 0; // number of bits in the MASK
 
   /**
    * The master class used to generate {@link HashTable}s.
@@ -182,7 +182,6 @@
   private SpillSet spillSet;
   HashJoinPOP popConfig;
 
-  private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
   IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
   private int maxBatchesInMemory;
@@ -195,21 +194,85 @@
   /**
    * This holds information about the spilled partitions for the build and probe side.
    */
-  public static class HJSpilledPartition {
-    public int innerSpilledBatches;
-    public String innerSpillFile;
-    public int outerSpilledBatches;
-    public String outerSpillFile;
-    int cycleNum;
-    int origPartn;
-    int prevOrigPartn;
+  public static class HashJoinSpilledPartition extends AbstractSpilledPartitionMetadata {
+    private final int innerSpilledBatches;
+    private final String innerSpillFile;
+    private int outerSpilledBatches;
+    private String outerSpillFile;
+    private boolean updatedOuter = false;
+
+    public HashJoinSpilledPartition(final int cycle,
+                                    final int originPartition,
+                                    final int prevOriginPartition,
+                                    final int innerSpilledBatches,
+                                    final String innerSpillFile) {
+      super(cycle, originPartition, prevOriginPartition);
+
+      this.innerSpilledBatches = innerSpilledBatches;
+      this.innerSpillFile = innerSpillFile;
+    }
+
+    public int getInnerSpilledBatches() {
+      return innerSpilledBatches;
+    }
+
+    public String getInnerSpillFile() {
+      return innerSpillFile;
+    }
+
+    public int getOuterSpilledBatches() {
+      Preconditions.checkState(updatedOuter);
+      return outerSpilledBatches;
+    }
+
+    public String getOuterSpillFile() {
+      Preconditions.checkState(updatedOuter);
+      return outerSpillFile;
+    }
+
+    public void updateOuter(final int outerSpilledBatches, final String outerSpillFile) {
+      Preconditions.checkState(!updatedOuter);
+      updatedOuter = true;
+
+      this.outerSpilledBatches = outerSpilledBatches;
+      this.outerSpillFile = outerSpillFile;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return String.format("Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).",
+        this.getOriginPartition(), this.getPrevOriginPartition(), this.getCycle(), outerSpilledBatches, innerSpilledBatches);
+    }
+  }
+
+  public class HashJoinUpdater implements SpilledState.Updater {
+    @Override
+    public void cleanup() {
+      HashJoinBatch.this.cleanup();
+    }
+
+    @Override
+    public String getFailureMessage() {
+      return "Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates).";
+    }
+
+    @Override
+    public long getMemLimit() {
+      return HashJoinBatch.this.allocator.getLimit();
+    }
+
+    @Override
+    public boolean hasPartitionLimit() {
+      return true;
+    }
   }
 
   /**
    * Queue of spilled partitions to process.
    */
-  private ArrayList<HJSpilledPartition> spilledPartitionsList = new ArrayList<>();
-  private HJSpilledPartition spilledInners[]; // for the outer to find the partition
+  private SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
+  private HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
+  private HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
 
   public enum Metric implements MetricDef {
     NUM_BUCKETS,
@@ -352,7 +415,7 @@ private IterOutcome prefetchFirstBatch(IterOutcome outcome,
       state = BatchState.STOP;
     } else {
       // Got our first batch(es)
-      if (cycleNum == 0) {
+      if (spilledState.isFirstCycle()) {
         // Only collect stats for the first cylce
         memoryManagerUpdate.run();
       }
@@ -470,7 +533,7 @@ public IterOutcome innerNext() {
               joinType,
               leftUpstream,
               partitions,
-              cycleNum,
+              spilledState.getCycle(),
               container,
               spilledInners,
               buildSideIsEmpty.booleanValue(),
@@ -514,9 +577,9 @@ public IterOutcome innerNext() {
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if (!buildSideIsEmpty.booleanValue() && !spilledPartitionsList.isEmpty()) {
+        if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
           // Get the next (previously) spilled partition to handle as incoming
-          HJSpilledPartition currSp = spilledPartitionsList.remove(0);
+          HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
 
           // Create a BUILD-side "incoming" out of the inner spill file of that partition
           buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
@@ -534,32 +597,7 @@ public IterOutcome innerNext() {
             hashJoinProbe.changeToFinalProbeState();
           }
 
-          // update the cycle num if needed
-          // The current cycle num should always be one larger than in the spilled partition
-          if (cycleNum == currSp.cycleNum) {
-            cycleNum = 1 + currSp.cycleNum;
-            stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
-            // report first spill or memory stressful situations
-            if (cycleNum == 1) { logger.info("Started reading spilled records "); }
-            if (cycleNum == 2) { logger.info("SECONDARY SPILLING "); }
-            if (cycleNum == 3) { logger.warn("TERTIARY SPILLING ");  }
-            if (cycleNum == 4) { logger.warn("QUATERNARY SPILLING "); }
-            if (cycleNum == 5) { logger.warn("QUINARY SPILLING "); }
-            if ( cycleNum * bitsInMask > 20 ) {
-              spilledPartitionsList.add(currSp); // so cleanup() would delete the curr spill files
-              this.cleanup();
-              throw UserException
-                .unsupportedError()
-                .message("Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates)\n"
-                + "On cycle num %d mem available %d num partitions %d", cycleNum, allocator.getLimit(), numPartitions)
-                .build(logger);
-            }
-          }
-          logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches)." +
-              " More {} spilled partitions left.",
-            currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches,
-            currSp.innerSpilledBatches, spilledPartitionsList.size());
-
+          spilledState.updateCycle(stats, currSp, spilledStateUpdater);
           state = BatchState.FIRST;  // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
 
           prefetchedBuild.setValue(false);
@@ -692,9 +730,7 @@ private void delayedSetup() {
     //  See partitionNumTuning()
     //
 
-    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
-    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
-
+    spilledState.initialize(numPartitions);
     // Create array for the partitions
     partitions = new HashPartition[numPartitions];
   }
@@ -707,10 +743,10 @@ private void initializeBuild() {
     // Recreate the partitions every time build is initialized
     for (int part = 0; part < numPartitions; part++ ) {
       partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch,
-        RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions);
+        RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), numPartitions);
     }
 
-    spilledInners = new HJSpilledPartition[numPartitions];
+    spilledInners = new HashJoinSpilledPartition[numPartitions];
 
   }
 
@@ -836,19 +872,18 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
     }
 
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
-    boolean firstCycle = cycleNum == 0;
 
     {
       // Initializing build calculator
       // Limit scope of these variables to this block
-      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
+      int maxBatchSize = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
       boolean doMemoryCalculation = canSpill && !probeSideIsEmpty.booleanValue();
       HashJoinMemoryCalculator calc = getCalculatorImpl();
 
       calc.initialize(doMemoryCalculation);
       buildCalc = calc.next();
 
-      buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed
+      buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
         buildBatch,
         probeBatch,
         buildJoinColumns,
@@ -862,13 +897,13 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
         batchMemoryManager.getOutputBatchSize(),
         HashTable.DEFAULT_LOAD_FACTOR);
 
-      if (firstCycle && doMemoryCalculation) {
+      if (spilledState.isFirstCycle() && doMemoryCalculation) {
         // Do auto tuning
         buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
       }
     }
 
-    if (firstCycle) {
+    if (spilledState.isFirstCycle()) {
       // Do initial setup only on the first cycle
       delayedSetup();
     }
@@ -906,11 +941,11 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
         }
         final int currentRecordCount = buildBatch.getRecordCount();
 
-        if ( cycleNum > 0 ) {
+        if (!spilledState.isFirstCycle()) {
           read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast();
         }
         //create runtime filter
-        if (cycleNum == 0 && enableRuntimeFilter) {
+        if (spilledState.isFirstCycle() && enableRuntimeFilter) {
           //create runtime filter and send out async
           int condFieldIndex = 0;
           for (BloomFilter bloomFilter : bloomFilters) {
@@ -924,10 +959,10 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
 
         // For every record in the build batch, hash the key columns and keep the result
         for (int ind = 0; ind < currentRecordCount; ind++) {
-          int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind)
+          int hashCode = spilledState.isFirstCycle() ? partitions[0].getBuildHashCode(ind)
             : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
-          int currPart = hashCode & partitionMask;
-          hashCode >>>= bitsInMask;
+          int currPart = hashCode & spilledState.getPartitionMask();
+          hashCode >>>= spilledState.getBitsInMask();
           // Append the new inner row to the appropriate partition; spill (that partition) if needed
           partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
         }
@@ -942,7 +977,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
     }
 
-    if (cycleNum == 0 && enableRuntimeFilter) {
+    if (spilledState.isFirstCycle() && enableRuntimeFilter) {
       if (bloomFilters.size() > 0) {
         int hashJoinOpId = this.popConfig.getOperatorId();
         runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
@@ -1004,14 +1039,13 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
 
     for (HashPartition partn : partitions) {
       if ( partn.isSpilled() ) {
-        HJSpilledPartition sp = new HJSpilledPartition();
-        sp.innerSpillFile = partn.getSpillFile();
-        sp.innerSpilledBatches = partn.getPartitionBatchesCount();
-        sp.cycleNum = cycleNum; // remember the current cycle
-        sp.origPartn = partn.getPartitionNum(); // for debugging / filename
-        sp.prevOrigPartn = originalPartition; // for debugging / filename
-        spilledPartitionsList.add(sp);
+        HashJoinSpilledPartition sp = new HashJoinSpilledPartition(spilledState.getCycle(),
+          partn.getPartitionNum(),
+          originalPartition,
+          partn.getPartitionBatchesCount(),
+          partn.getSpillFile());
 
+        spilledState.addPartition(sp);
         spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
         partn.closeWriter();
 
@@ -1169,8 +1203,8 @@ private void cleanup() {
     }
 
     // delete any spill file left in unread spilled partitions
-    while ( ! spilledPartitionsList.isEmpty() ) {
-      HJSpilledPartition sp = spilledPartitionsList.remove(0);
+    while (!spilledState.isEmpty()) {
+      HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
       try {
         spillSet.delete(sp.innerSpillFile);
       } catch(IOException e) {
@@ -1210,7 +1244,7 @@ public String makeDebugString() {
    */
   private void updateStats() {
     if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty
-    if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+    if (!spilledState.isFirstCycle()) { return; } // These stats are only for before processing spilled files
 
     final HashTableStats htStats = new HashTableStats();
     long numSpilled = 0;
@@ -1227,7 +1261,7 @@ private void updateStats() {
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no spill
+    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
     this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
@@ -1257,7 +1291,7 @@ public void updateMetrics() {
 
   @Override
   public void close() {
-    if ( cycleNum > 0 ) { // spilling happened
+    if (!spilledState.isFirstCycle()) { // spilling happened
       // In case closing due to cancellation, BaseRootExec.close() does not close the open
       // SpilledRecordBatch "scanners" as it only knows about the original left/right ops.
       killIncoming(false);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 5059b18d216..beddfa68faa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -39,7 +39,7 @@
     PROBE_PROJECT, PROJECT_RIGHT, DONE
   }
 
-  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition);
+  void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition);
   int  probeAndProject() throws SchemaChangeException;
   void changeToFinalProbeState();
   void setTargetOutputCount(int targetOutputCount);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 71abeda1139..a63f63d9a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -77,7 +77,7 @@
   private int currRightPartition = 0; // for returning RIGHT/FULL
   IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
   private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
-  private HashJoinBatch.HJSpilledPartition spilledInners[]; // for the outer to find the partition
+  private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
   private boolean buildSideIsEmpty = true;
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
   private int partitionMask = 0; // numPartitions - 1
@@ -110,7 +110,7 @@ public int getOutputCount() {
    * @param rightHVColPosition
    */
   @Override
-  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HJSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
+  public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[] partitions, int cycleNum, VectorContainer container, HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
     this.container = container;
     this.spilledInners = spilledInners;
     this.probeBatch = probeBatch;
@@ -253,9 +253,8 @@ private void executeProbePhase() throws SchemaChangeException {
               if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
               partn.completeAnOuterBatch(false);
               // update the partition's spill record with the outer side
-              HashJoinBatch.HJSpilledPartition sp = spilledInners[partn.getPartitionNum()];
-              sp.outerSpillFile = partn.getSpillFile();
-              sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+              HashJoinBatch.HashJoinSpilledPartition sp = spilledInners[partn.getPartitionNum()];
+              sp.updateOuter(partn.getPartitionBatchesCount(), partn.getSpillFile());
 
               partn.closeWriter();
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services