You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Satish Kolli (JIRA)" <ji...@apache.org> on 2016/07/21 13:22:20 UTC

[jira] [Created] (SPARK-16664) Spark 1.6.2 - Persist call on Data frames with more than 200 columns is wiping out the data.

Satish Kolli created SPARK-16664:
------------------------------------

             Summary: Spark 1.6.2 - Persist call on Data frames with more than 200 columns is wiping out the data.
                 Key: SPARK-16664
                 URL: https://issues.apache.org/jira/browse/SPARK-16664
             Project: Spark
          Issue Type: Bug
    Affects Versions: 1.6.2
            Reporter: Satish Kolli
            Priority: Blocker


Calling persist on a data frame with more than 200 columns is removing the data from the data frame. This is an issue in Spark 1.6.2. Works with out any issues in Spark 1.6.1

Following test case demonstrates problem. Please let me know if you need any additional information. Thanks.

{code}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.scalatest.FunSuite

class TestSpark162_1 extends FunSuite {

  test("test data frame with 200 columns") {
    val sparkConfig = new SparkConf()
    val parallelism = 5
    sparkConfig.set("spark.default.parallelism", s"$parallelism")
    sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")

    val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
    val sqlContext = new SQLContext(sc)

    // create dataframe with 200 columns and fake 200 values
    val size = 200
    val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
    val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))

    val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true))
    val testSchema = StructType(schemas)
    val testDf = sqlContext.createDataFrame(rowRdd, testSchema)

    // test value
    assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
    sc.stop()
  }

  test("test data frame with 201 columns") {
    val sparkConfig = new SparkConf()
    val parallelism = 5
    sparkConfig.set("spark.default.parallelism", s"$parallelism")
    sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")

    val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
    val sqlContext = new SQLContext(sc)

    // create dataframe with 201 columns and fake 201 values
    val size = 201
    val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
    val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))

    val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true))
    val testSchema = StructType(schemas)
    val testDf = sqlContext.createDataFrame(rowRdd, testSchema)

    // test value
    assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
    sc.stop()
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org