You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marc Arndt (JIRA)" <ji...@apache.org> on 2019/05/27 08:43:00 UTC

[jira] [Updated] (SPARK-27851) Allow for custom BroadcastMode return values

     [ https://issues.apache.org/jira/browse/SPARK-27851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marc Arndt updated SPARK-27851:
-------------------------------
    Description: 
According to the BroadcastMode API the BroadcastMode#transform methods are allows to return a result object of an arbitrary type:

{code:scala}
/**
 * Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
 * identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
 */
trait BroadcastMode {
  def transform(rows: Array[InternalRow]): Any

  def transform(rows: Iterator[InternalRow], sizeHint: Option[Long]): Any

  def canonicalized: BroadcastMode
}
{code}

When looking at the code which later uses the instantiated BroadcastMode objects in BroadcastExchangeExec it becomes that this is not really the base. 

The following lines in BroadcastExchangeExec suggest that only objects of type HashedRelation and Array[InternalRow] are allowed as a result for the BroadcastMode#transform methods:

{code:scala}
// Construct the relation.
val relation = mode.transform(input, Some(numRows))

val dataSize = relation match {
    case map: HashedRelation =>
        map.estimatedSize
    case arr: Array[InternalRow] =>
        arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
    case _ =>
        throw new SparkException("[BUG] BroadcastMode.transform returned unexpected type: " +
            relation.getClass.getName)
}
{code}

I believe that this is the only occurrence in the code where the result of the BroadcastMode#transform method must be either of type HashedRelation or Array[InternalRow]. Therefore to allow for broader custom implementations of the BroadcastMode I believe it would be a good idea to solve the calculation of the data size of the broadcast value in an independent way of the used BroadcastMode implemented.

One way this could be done is by providing an additional BroadcastMode#calculateDataSize method, which needs to be implemented by the BroadcastMode implementations. This methods could then return the required number of bytes for a given broadcast value.

  was:
According to the BroadcastMode API the BroadcastMode#transform methods are allows to return a result object of an arbitrary type:

{code:scala}
/**
 * Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
 * identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
 */
trait BroadcastMode {
  def transform(rows: Array[InternalRow]): Any

  def transform(rows: Iterator[InternalRow], sizeHint: Option[Long]): Any

  def canonicalized: BroadcastMode
}
{code}

When looking at the code which later uses the instantiated BroadcastMode objects in BroadcastExchangeExec it becomes that this is not really the base. 

The following lines in BroadcastExchangeExec suggest that only objects of type HashRElation and Array[InternalRow] are allowed as a result for the BroadcastMode#transform methods:

{code:scala}
// Construct the relation.
val relation = mode.transform(input, Some(numRows))

val dataSize = relation match {
    case map: HashedRelation =>
        map.estimatedSize
    case arr: Array[InternalRow] =>
        arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
    case _ =>
        throw new SparkException("[BUG] BroadcastMode.transform returned unexpected type: " +
            relation.getClass.getName)
}
{code}

I believe that this is the only occurrence in the code where the result of the BroadcastMode#transform method must be either of type HashedRelation or Array[InternalRow]. Therefore to allow for broader custom implementations of the BroadcastMode I believe it would be a good idea to solve the calculation of the data size of the broadcast value in an independent way of the used BroadcastMode implemented.

One way this could be done is by providing an additional BroadcastMode#calculateDataSize method, which needs to be implemented by the BroadcastMode implementations. This methods could then return the required number of bytes for a given broadcast value.


> Allow for custom BroadcastMode return values
> --------------------------------------------
>
>                 Key: SPARK-27851
>                 URL: https://issues.apache.org/jira/browse/SPARK-27851
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, SQL
>    Affects Versions: 2.4.3
>            Reporter: Marc Arndt
>            Priority: Major
>
> According to the BroadcastMode API the BroadcastMode#transform methods are allows to return a result object of an arbitrary type:
> {code:scala}
> /**
>  * Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
>  * identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
>  */
> trait BroadcastMode {
>   def transform(rows: Array[InternalRow]): Any
>   def transform(rows: Iterator[InternalRow], sizeHint: Option[Long]): Any
>   def canonicalized: BroadcastMode
> }
> {code}
> When looking at the code which later uses the instantiated BroadcastMode objects in BroadcastExchangeExec it becomes that this is not really the base. 
> The following lines in BroadcastExchangeExec suggest that only objects of type HashedRelation and Array[InternalRow] are allowed as a result for the BroadcastMode#transform methods:
> {code:scala}
> // Construct the relation.
> val relation = mode.transform(input, Some(numRows))
> val dataSize = relation match {
>     case map: HashedRelation =>
>         map.estimatedSize
>     case arr: Array[InternalRow] =>
>         arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
>     case _ =>
>         throw new SparkException("[BUG] BroadcastMode.transform returned unexpected type: " +
>             relation.getClass.getName)
> }
> {code}
> I believe that this is the only occurrence in the code where the result of the BroadcastMode#transform method must be either of type HashedRelation or Array[InternalRow]. Therefore to allow for broader custom implementations of the BroadcastMode I believe it would be a good idea to solve the calculation of the data size of the broadcast value in an independent way of the used BroadcastMode implemented.
> One way this could be done is by providing an additional BroadcastMode#calculateDataSize method, which needs to be implemented by the BroadcastMode implementations. This methods could then return the required number of bytes for a given broadcast value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org