You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2018/01/17 14:05:00 UTC

[jira] [Resolved] (SPARK-23076) When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result

     [ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-23076.
-------------------------------
    Resolution: Not A Problem

> When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-23076
>                 URL: https://issues.apache.org/jira/browse/SPARK-23076
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: zhoukang
>            Priority: Major
>         Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:10000/> select * from csv_demo limit 3;
>  +----------------+++--
> |_c0|_c1|
> +----------------+++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> +----------------+++--
>  However,when we call cache on MapPartitionsRDD below:
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  +----------------+++--
> |_c0|_c1|
> +----------------+++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> +----------------+++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
>     if (i != 0) build.append(',');
>     build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000007,2000000002,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
>     if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>       && x._2.isInstanceOf[UnsafeRow]) {
>       (x._2).asInstanceOf[UnsafeRow].copy()
>     } else {
>       x._2
>     }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org