You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lin (JIRA)" <ji...@apache.org> on 2017/07/22 04:36:00 UTC

[jira] [Created] (SPARK-21505) A dynamic join operator to improve the join reliability

Lin created SPARK-21505:
---------------------------

             Summary: A dynamic join operator to improve the join reliability
                 Key: SPARK-21505
                 URL: https://issues.apache.org/jira/browse/SPARK-21505
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 2.2.0, 2.3.0, 3.0.0
            Reporter: Lin


As we know, hash join is more efficient than sort merge join. But today hash join is not so widely used because it may fail with OutOfMemory (OOM) error due to limited memory resource, data skew, statistics mis-estimation and so on. For example, if we apply shuffle hash join on an uneven distributed dataset, some partitions might be so large  that we cannot make a Hash table for this particular partition causing OOM error. When OOM happens, current Spark technology will throw an Exception, resulting in job failure. On the other hand, if sort-merge join is used, there will be shuffle, sorting and extra spill, causing the degradation of the join. Considering the efficiency of hash join, we want to propose a fallback mechanism to dynamically use hash join or sort-merge join at runtime at task level to provide a more reliable join operation.

This new dynamic join operator internally implements the logic of HashJoin, Iterator Reconstruct, Sort, and MergeJoin.  We show the process of this dynamic join method as following:

HashJoin: We start from building  Hash table on one side of join partitions. If Hash table is built successfully, it would be the same as the current ShuffledHashJoin operator. 

Sort: If we fail to build Hash table due to the large partition size, we do SortMergeJoin only on this partition. But we need to rebuild the   When OOM happens, a Hash table corresponding to partial part of this partition has been built successfully (e.g. first 4000 rows of RDD), and the iterator of this partition is now pointing to the 4001st row of partition. We reuse this hash table to reconstruct the iterator for the first 4000 rows and concatenate  with the rest rows of this partition so that we can rebuild this partition completely. On this re-built partition, we apply sorting based on key values.

MergeJoin: After getting two sorted Iterators, we perform regular merge join against them and emits the records to downstream operators.

Iterator Reconstruct:  BytesToBytesMap has to be spilled to disk to release the memory for other operators, such as Sort, Join, etc. In addition, it has to be converted to Iterator, so that it can be concatenated with remaining items in the original iterator that is used to build the hash table.

Meta Data Population: Necessary metadata, such as sorting keys, jointype, etc,  has to be populated, so that they are used for potential Sort and MergeJoin operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org