You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lijia Liu (JIRA)" <ji...@apache.org> on 2019/01/23 13:16:00 UTC

[jira] [Updated] (SPARK-26705) UnsafeHashedRelation changed after broadcasted

     [ https://issues.apache.org/jira/browse/SPARK-26705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lijia Liu updated SPARK-26705:
------------------------------
    Summary: UnsafeHashedRelation changed after broadcasted  (was: UnsafeHashedRelation changed after serialization)

> UnsafeHashedRelation changed after broadcasted
> ----------------------------------------------
>
>                 Key: SPARK-26705
>                 URL: https://issues.apache.org/jira/browse/SPARK-26705
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.1
>         Environment: spark:2.2.1
> jdk:
> java version "1.8.0_112"
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
> Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>            Reporter: Lijia Liu
>            Priority: Critical
>
> When Spark SQL execute a broadcast join, it will convert small table into 
> UnsafeHashedRelation. But, in our cluster, I find that the UnsafeHashedRelation may changed after been broadcasted.
> The broadcasting process is divided into the following steps:
> 1. Collect the small table data to Array[InternalRow]. (BroadcastExchangeExec.scala)
> 2. Transform the Array[InternalRow] to UnsafeHashedRelation object, UnsafeHashedRelation use a BytesToBytesMap to store data.(BroadcastExchangeExec.scala)
> 3. Use UnsafeHashedRelation.write function to serialize the object to byte stream.(HashedRelation.scala)
> 4. Use UnsafeHashedRelation.read function to deserialize the byte stream to UnsafeHashedRelation object.(HashedRelation.scala)
> I found that sometimes the BytesToBytesMap in UnsafeHashedRelation after deserialization is different from the one in UnsafeHashedRelation before serialization. For example, the number of keys become smaller.
> In our cluster, I will judge the BytesToBytesMap' key size and value size in UnsafeHashedRelation.read function to prevent incorrect result. But, I didn't found the real reason for this bug.
> The job witch occurred error has some peculiarities:
> 1. the broadcasted table is very big.
> 2. It's not a stable recurrence.
> 3. There are many tables has being broadcasted in the job.
> 4. Driver's memory pressure is high.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org