You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by Ben-Zvi <gi...@git.apache.org> on 2018/05/02 02:24:21 UTC

[GitHub] drill pull request #1248: DRIL-6027: Implement Spilling for the Hash-Join

GitHub user Ben-Zvi opened a pull request:

    https://github.com/apache/drill/pull/1248

    DRIL-6027: Implement Spilling for the Hash-Join

    This PR covers the work to enable the Hash-Join operator (*HJ*) to spill - when its limited memory becomes too small to hold the incoming data. 
     @ilooner is a co-contributor of this work.
    
    Below is a high level description of the main changes, to help the reviewers. More design detail is available in the design document (https://docs.google.com/document/d/1-c_oGQY4E5d58qJYv_zc7ka834hSaB3wDQwqKcMoSAI/)
    Some of this work follows a prior similar work done for the Hash-Aggregate (*HAG*) operator; some similarity to the HAG is mentioned to help reviewrs familiar with those changes.
    
    h2. Partitions:
    Just like the HAG spilling, the main idea to enable spilling is to split the incoming rows into separate *Partitions*, such that the HJ can gradually adopt to a memory pressure situation by picking an in-memory partition and spilling it as the need arises, thus freeing some memory.
    Unlike the HAG, the HJ has two incomings - the build/inner/right and the probe/outer/left. The HJ partitions its Build side first, and if needed, may spill some of these partitions as data is read. Later the Probe side is read and partitioned the same way, where outer partitions matching spilled inner partitions are spilled as well - unconditionally.
    
    h6. {{HashPartition}} class:
    A new class {{HashPartition}} was created to encapsulate the work of each partition; this class handles the pair - the build-side partition and its matching probe-side partition. Most of its code was extracted from prior code in {{HashJoinBatch}}.
    
    h4. Hash Values:
    The hash-values are computed at first time, then saved into a special column (named "Hash_Values"), which may be spilled, etc. This avoids recomputation (unlike the HAG, which recomputes). After reading a batch from a spill file, this Hash-values vector is separated (into {{read_HV_vector}}) and used instead of computing the hash values.
    
    h4. Build Hash Table:
    Unlike the HAG - the hash-table (and "helper") are built (per each inner partition) only *after* that whole partition was read into memory. (This avoids wasted work, in case the partition needs to spill). Another improvement: As the number of entries is known at that final time (ignoring duplicates), then the hash table can be initially sized right, avoiding the need for later costly resizings (see {{hashTable.updateInitialCapacity()}}). 
    
    h4. Same as the HAG:
    * Same metrics (NUM_PARTITIONS,	SPILLED_PARTITIONS, SPILL_MB, SPILL_CYCLE) 
    * Using the {{SpillSet}} class.
    * Recursive spilling. (Nearly the same code - see {{innerNext()}} in {{HashJoinBatch.java}}). Except that the HJ may have duplicate entries - so when the spill cycle has consumed more than 20 bits of the hash value, then err.
    * Option controlling the number of partitions (and when that number is 1 --> spilling is disabled).
    
    h6. Avoid copying:
    Copying the incoming build data into the partitions' batches is a new extra step, adding some overhead. To match performance with prior Drill, in case of a single partition (no spilling, no memory checks) -- the incoming vectors are used as is, without copying. Future work may extend this for the general case (involving memory checks, etc.)
    
    h2. Memory Calculations:
    h4. Initial memory allocation:
    The HJ was made a "buffered" operator (see {{isBufferedOperator()}}, just like the HAG and the External Sort), hence gets assigned an equal memory share (out of the "memory per query per node"; see {{setupBufferedOpsMemoryAllocations()}}). Except when the number of partitions is forced to be 1, when it "falls back" to the "old uncontrolled" behavior (similar to what was done for the HAG).
    
    h4. Memory Calculator:
    The memory calculator is knowlegable of the current and future memory needs (including current memory usage of all the partitions, an outgoing batch, an incoming outer batch, and the hash tables and "helpers"). The calculator is used first to find an optimal number of partitions (starting from a number controll by {{hashjoin_num_partitions}}, default 32, and lowering if that number requires too much memory). The second use of the calculator is to determine if a spill is needed, prior to allocating more memory (see {{shouldSpill()}}). This chack is performed at two places: When reading the build side and about to allocate a new batch (see {{appendInnerRow()}}). And when hash tables (and helpers) are allocated for the in-memory partitions (in {{executeBuildPhase()}}).
    
    h6. Implementation:
    The {{HashJoinMemoryCalculator}} is an interface, implemented by {{HashJoinMemoryCalculatorImpl}} for regular work. For testing, we can limit the number of batches {{max_batches_in_memory}} - and then anther implementation takes over - {{HashJoinMechanicalMemoryCalculator}}, which uses the number of batches as the spilling trigger. When memory checks are disabled (e.g., when using a single partition), then a special no-op calculator is used instead - {{NoopBuildSidePartitioningImpl}}.
    
    The memory estimates rely on statistics derived from the actual data. Thus the HJ now, during schema discovery, also tries to "sniff forward" and look at batches with real-data (see {{sniffNonEmptyBatch}}). 
    
    h2. Generated Code:
    The prior HJ code that generated code (used to move data from Probe and Build incomings into the Outgoing batch) was eliminated, replaced by various {{appendRow}} methods (see {{VectorContainer.java}}) that are based on the new {{copyEntry()}} method supported by all vector types.
    
    h2. HashTable using VectorContainer:
    The HashTable was modified to use {{VectorContainer}} instead of {{RecordBatch}}. Thus a {{getContainer()}} method was added to the {{AbstractRecordBatch}} and all its subclasses.
    
    h2: {{HashJoinProbe}}:
    The {{HashJoinProbe[template]}} were eliminated (partly because of the removal of the generated code) - the remaining code was mostly merged into the end of {{HashJoinBatch.java}}.
    
    h2. Other:
    * The batches used by the partitions are internal to the HJ. Their size is controlled by an option {{num_rows_in_batch}}, defaule 1024.
    * Not using the {{HyperContainer}} anymore, just a list of containers.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Ben-Zvi/drill DRILL-6027

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1248
    
----
commit ab63a01f46779296ab6cefc4b8c93aae37cc9d19
Author: Ben-Zvi <bb...@...>
Date:   2018-02-14T20:36:39Z

    DRILL-6027: Initial implementation of HashJoin spill, without memory limits checks yet

commit 42fb19ba6fcc38b105601a167c32ea7ca137c407
Author: Timothy Farkas <ti...@...>
Date:   2018-02-13T00:49:29Z

    DRILL-6027:
     - Added memory claculator
     - Added unit tests and docs.
     - Fixed IOB caused by output vector allocation.
     - Don't double count records that were spilled in HashJoin

commit 35f6b9cca7ee23de50ef4863ee9aec7f3998985f
Author: Ben-Zvi <bb...@...>
Date:   2018-04-28T04:59:25Z

    DRILL-6027:
      - Added fallback option for HashJoin.
      - No copy of incoming for single partition, and avoid HT resize.
      - Fix memory leak when cancelling while spill file is read
      - get correct schema when probe side is empty

----


---