You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2018/01/17 14:05:00 UTC
[jira] [Resolved] (SPARK-23076) When we call cache() on RDD which
depends on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-23076.
-------------------------------
Resolution: Not A Problem
> When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: zhoukang
> Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
> 0: jdbc:hive2://10.108.230.228:10000/> select * from csv_demo limit 3;
> +----------------+++--
> |_c0|_c1|
> +----------------+++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> +----------------+++--
> However,when we call cache on MapPartitionsRDD below:
> !shufflerowrdd-cache.png!
> Then result will be error:
> 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
> +----------------+++--
> |_c0|_c1|
> +----------------+++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> +----------------+++--
> The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
> StringBuilder build = new StringBuilder("[");
> for (int i = 0; i < sizeInBytes; i += 8) {
> if (i != 0) build.append(',');
> build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
> }
> build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here
> return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000007,2000000002,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
> if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
> && x._2.isInstanceOf[UnsafeRow]) {
> (x._2).asInstanceOf[UnsafeRow].copy()
> } else {
> x._2
> }
> })
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org