You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Grant Henke (JIRA)" <ji...@apache.org> on 2019/02/14 16:43:00 UTC

[jira] [Updated] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows

     [ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Grant Henke updated SPARK-26880:
--------------------------------
    Description: 
I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. 

This simple test illustrates the issue:
{code}
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class SparkTest extends JUnitSuite with Matchers {
  val Log: Logger = LoggerFactory.getLogger(getClass)

  @Test
  def testSparkRowCorruption(): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test")
      .set("spark.ui.enabled", "false")
    val ss = SparkSession.builder().config(conf).getOrCreate()

    // Setup a DataFrame for testing.
    val data = Seq(
      Row.fromSeq(Seq(0, "0")),
      Row.fromSeq(Seq(25, "25")),
      Row.fromSeq(Seq(50, "50")),
      Row.fromSeq(Seq(75, "75")),
      Row.fromSeq(Seq(99, "99")),
      Row.fromSeq(Seq(100, "100")),
      Row.fromSeq(Seq(101, "101")),
      Row.fromSeq(Seq(125, "125")),
      Row.fromSeq(Seq(150, "150")),
      Row.fromSeq(Seq(175, "175")),
      Row.fromSeq(Seq(199, "199"))
    )
    val dataRDD = ss.sparkContext.parallelize(data)
    val schema = StructType(
      Seq(
        StructField("key", IntegerType),
        StructField("value", StringType)
      ))
    val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)

    // Convert to an RDD.
    val rdd = dataDF.queryExecution.toRdd
    
    // Collect the data to compare.
    val resultData = rdd.collect
    resultData.foreach { row =>
      // Log for visualizing the corruption.
      Log.error(s"${row.getInt(0)}")
    }

    // Ensure the keys in the original data and resulting data match.
    val dataKeys = data.map(_.getInt(0)).toSet
    val resultKeys = resultData.map(_.getInt(0)).toSet
    assertEquals(dataKeys, resultKeys)
  }

}
{code}
That test fails with the following:
{noformat}
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199

expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but was:<Set(0, 25, 125, 150, 199, 99, 75, 100)>
Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
{noformat}
If I map from and InternalRow to a Row the issue goes away:
{code}
val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
      val encoder = RowEncoder.apply(schema).resolveAndBind()
      internalRows.map(encoder.fromRow)
    }
{code}

  was:
I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. 

This simple test illustrates the issue:
{code:scala}
package org.apache.kudu.spark.kudu
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class SparkTest extends JUnitSuite with Matchers {
  val Log: Logger = LoggerFactory.getLogger(getClass)

  @Test
  def testSparkRowCorruption(): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test")
      .set("spark.ui.enabled", "false")
    val ss = SparkSession.builder().config(conf).getOrCreate()

    // Setup a DataFrame for testing.
    val data = Seq(
      Row.fromSeq(Seq(0, "0")),
      Row.fromSeq(Seq(25, "25")),
      Row.fromSeq(Seq(50, "50")),
      Row.fromSeq(Seq(75, "75")),
      Row.fromSeq(Seq(99, "99")),
      Row.fromSeq(Seq(100, "100")),
      Row.fromSeq(Seq(101, "101")),
      Row.fromSeq(Seq(125, "125")),
      Row.fromSeq(Seq(150, "150")),
      Row.fromSeq(Seq(175, "175")),
      Row.fromSeq(Seq(199, "199"))
    )
    val dataRDD = ss.sparkContext.parallelize(data)
    val schema = StructType(
      Seq(
        StructField("key", IntegerType),
        StructField("value", StringType)
      ))
    val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)

    // Convert to an RDD.
    val rdd = dataDF.queryExecution.toRdd
    
    // Collect the data to compare.
    val resultData = rdd.collect
    resultData.foreach { row =>
      // Log for visualizing the corruption.
      Log.error(s"${row.getInt(0)}")
    }

    // Ensure the keys in the original data and resulting data match.
    val dataKeys = data.map(_.getInt(0)).toSet
    val resultKeys = resultData.map(_.getInt(0)).toSet
    assertEquals(dataKeys, resultKeys)
  }

}
{code}
That test fails with the following: 
{noformat}
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199

expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but was:<Set(0, 25, 125, 150, 199, 99, 75, 100)>
Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
{noformat}

If I map from and InternalRow to a Row the issue goes away:
{code:scala}
val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
      val encoder = RowEncoder.apply(schema).resolveAndBind()
      internalRows.map(encoder.fromRow)
    }
{code}




> dataDF.queryExecution.toRdd corrupt rows
> ----------------------------------------
>
>                 Key: SPARK-26880
>                 URL: https://issues.apache.org/jira/browse/SPARK-26880
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Grant Henke
>            Priority: Major
>
> I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. 
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
>   val Log: Logger = LoggerFactory.getLogger(getClass)
>   @Test
>   def testSparkRowCorruption(): Unit = {
>     val conf = new SparkConf()
>       .setMaster("local[*]")
>       .setAppName("test")
>       .set("spark.ui.enabled", "false")
>     val ss = SparkSession.builder().config(conf).getOrCreate()
>     // Setup a DataFrame for testing.
>     val data = Seq(
>       Row.fromSeq(Seq(0, "0")),
>       Row.fromSeq(Seq(25, "25")),
>       Row.fromSeq(Seq(50, "50")),
>       Row.fromSeq(Seq(75, "75")),
>       Row.fromSeq(Seq(99, "99")),
>       Row.fromSeq(Seq(100, "100")),
>       Row.fromSeq(Seq(101, "101")),
>       Row.fromSeq(Seq(125, "125")),
>       Row.fromSeq(Seq(150, "150")),
>       Row.fromSeq(Seq(175, "175")),
>       Row.fromSeq(Seq(199, "199"))
>     )
>     val dataRDD = ss.sparkContext.parallelize(data)
>     val schema = StructType(
>       Seq(
>         StructField("key", IntegerType),
>         StructField("value", StringType)
>       ))
>     val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
>     // Convert to an RDD.
>     val rdd = dataDF.queryExecution.toRdd
>     
>     // Collect the data to compare.
>     val resultData = rdd.collect
>     resultData.foreach { row =>
>       // Log for visualizing the corruption.
>       Log.error(s"${row.getInt(0)}")
>     }
>     // Ensure the keys in the original data and resulting data match.
>     val dataKeys = data.map(_.getInt(0)).toSet
>     val resultKeys = resultData.map(_.getInt(0)).toSet
>     assertEquals(dataKeys, resultKeys)
>   }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but was:<Set(0, 25, 125, 150, 199, 99, 75, 100)>
> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
> Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
> {noformat}
> If I map from and InternalRow to a Row the issue goes away:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
>       val encoder = RowEncoder.apply(schema).resolveAndBind()
>       internalRows.map(encoder.fromRow)
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org