You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dilip Biswal (JIRA)" <ji...@apache.org> on 2019/02/14 22:01:02 UTC
[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd
corrupt rows
[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768754#comment-16768754 ]
Dilip Biswal edited comment on SPARK-26880 at 2/14/19 10:00 PM:
----------------------------------------------------------------
Hello,
Shouldn't we use `dataDF.rdd` as opposed to ` dataDF.queryExecution.toRdd` to get RDD[Rows] ? In my understanding, the former one
returns the rdd after converting the rows from internal format to external one. The later one is
the internal version of RDD i.e RDD[InternalRow].
cc [~srowen@scient.com] [~cloud_fan]
Regards,
– Dilip
was (Author: dkbiswal):
Hello,
Shouldn't we use `dataDF.rdd` as opposed to ` dataDF.queryExecution.toRdd` to get RDD[Rows] ? The former one
returns the rdd after converting the rows from internal format to external one. The later one is
the internal version of RDD i.e RDD[InternalRow].
cc [~srowen@scient.com] [~cloud_fan]
Regards,
-- Dilip
> dataDF.queryExecution.toRdd corrupt rows
> ----------------------------------------
>
> Key: SPARK-26880
> URL: https://issues.apache.org/jira/browse/SPARK-26880
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: Grant Henke
> Priority: Major
>
> I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing.
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
> val Log: Logger = LoggerFactory.getLogger(getClass)
> @Test
> def testSparkRowCorruption(): Unit = {
> val conf = new SparkConf()
> .setMaster("local[*]")
> .setAppName("test")
> .set("spark.ui.enabled", "false")
> val ss = SparkSession.builder().config(conf).getOrCreate()
> // Setup a DataFrame for testing.
> val data = Seq(
> Row.fromSeq(Seq(0, "0")),
> Row.fromSeq(Seq(25, "25")),
> Row.fromSeq(Seq(50, "50")),
> Row.fromSeq(Seq(75, "75")),
> Row.fromSeq(Seq(99, "99")),
> Row.fromSeq(Seq(100, "100")),
> Row.fromSeq(Seq(101, "101")),
> Row.fromSeq(Seq(125, "125")),
> Row.fromSeq(Seq(150, "150")),
> Row.fromSeq(Seq(175, "175")),
> Row.fromSeq(Seq(199, "199"))
> )
> val dataRDD = ss.sparkContext.parallelize(data)
> val schema = StructType(
> Seq(
> StructField("key", IntegerType),
> StructField("value", StringType)
> ))
> val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
> // Convert to an RDD.
> val rdd = dataDF.queryExecution.toRdd
>
> // Collect the data to compare.
> val resultData = rdd.collect
> resultData.foreach { row =>
> // Log for visualizing the corruption.
> Log.error(s"${row.getInt(0)}")
> }
> // Ensure the keys in the original data and resulting data match.
> val dataKeys = data.map(_.getInt(0)).toSet
> val resultKeys = resultData.map(_.getInt(0)).toSet
> assertEquals(dataKeys, resultKeys)
> }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but was:<Set(0, 25, 125, 150, 199, 99, 75, 100)>
> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
> Actual :Set(0, 25, 125, 150, 199, 99, 75, 100)
> {noformat}
> If I map from and InternalRow to a Row the issue goes away:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
> val encoder = RowEncoder.apply(schema).resolveAndBind()
> internalRows.map(encoder.fromRow)
> }
> {code}
> Converting with CatalystTypeConverters also appears to resolve the issue:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
> val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
> internalRows.map(ir => typeConverter(ir).asInstanceOf[Row])
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org