You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Hao (JIRA)" <ji...@apache.org> on 2015/10/28 03:25:27 UTC

[jira] [Commented] (SPARK-11330) Filter operation on StringType after groupBy PERSISTED brings no results

    [ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14977600#comment-14977600 ] 

Cheng Hao commented on SPARK-11330:
-----------------------------------

Hi, [~saif.a.ellafi], I've tried the code like below:
{code}
case class Spark11330(account_id: Int, product: String, vint: String,
                      band: String, age: Int, mb: String, yyyymm: String,
                      balance: Float, balancec: Float)

test("SPARK-11330: Filter operation on StringType after groupBy PERSISTED brings no results") {
    withTempPath { f =>
      val d = Seq(
        Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 1000.0f, 2000.0f),
        Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 2000.0f, 2000.0f),
        Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200809", 2000.0f, 2000.0f),
        Spark11330(2, "product2", "2007-01-01", "band3", 29, "mb1", "200809", 2010.0f, 3000.0f))

      val data = List.tabulate[Seq[Spark11330]](10) { i => d }.flatten

      sqlContext.sparkContext.parallelize(data, 4)
        .toDF().write.format("parquet").mode(SaveMode.Overwrite).save(f.getAbsolutePath)

      val df = sqlContext.read.parquet(f.getAbsolutePath)

      val f1 = df.groupBy("vint").count().persist().filter("vint = '2007-01-01'").first
      val f2 = df.groupBy("vint").count().filter("vint = '2007-01-01'").first

      assert(f1 == f2)

      val res = df
        .groupBy("product", "band", "age", "vint", "mb", "yyyymm")
        .agg(
          count($"account_id").as("N"),
          sum($"balance").as("balance_eom"),
          sum($"balancec").as("balance_eoc")).persist()

      val c1 = res.select("vint", "yyyymm").filter("vint='2007-01-01'").select("yyyymm").distinct.collect
      res.unpersist()
      val c2 = res.select("vint", "yyyymm").filter("vint='2007-01-01'").select("yyyymm").distinct.collect
      assert(c1.sameElements(c2))
    }
  }
{code}

Seems everything works fine, I am not sure if I missed something, can you try to reproduce the issue based on my code?

> Filter operation on StringType after groupBy PERSISTED brings no results
> ------------------------------------------------------------------------
>
>                 Key: SPARK-11330
>                 URL: https://issues.apache.org/jira/browse/SPARK-11330
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.1
>         Environment: Stand alone Cluster of five servers (happens as well in local mode). sqlContext instance of HiveContext (happens as well with SQLContext)
> No special options other than driver memory and executor memory.
> Parquet partitions are 512 where there are 160 cores. Happens as well with other partitioning
> Data is nearly 2 billion rows.
>            Reporter: Saif Addin Ellafi
>            Priority: Blocker
>
> ONLY HAPPENS WHEN PERSIST() IS CALLED
> val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
> data.groupBy("vintages").count.select("vintages").filter("vintages = '2007-01-01'").first
> >>> res9: org.apache.spark.sql.Row = [2007-01-01]
> data.groupBy("vintages").count.persist.select("vintages").filter("vintages = '2007-01-01'").first
> >>> Exception on empty iterator stuff
> This does not happen if using another type of field, eg IntType
> data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 200805").first >>> res13: org.apache.spark.sql.Row = [200805]
> NOTE: I have no idea whether I used KRYO serializer when stored this parquet.
> NOTE2: If setting the persist after the filtering, it works fine. But this is not a good enough workaround since any filter operation afterwards will break results.
> NOTE3: I have reproduced the issue with several different datasets.
> NOTE4: The only real workaround is to store the groupBy dataframe in database and reload it as a new dataframe.
> Query to raw-data works fine:
> data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: org.apache.spark.sql.Row = [2007-01-01]
> Originally, the issue happened with a larger aggregation operation, the result was that data was inconsistent bringing different results every call.
> Reducing the operation step by step, I got into this issue.
> In any case, the original operation was:
> val data = sqlContext.read.parquet("/var/Saif/data_pqt")
> val res = data.groupBy("product", "band", "age", "vint", "mb", "yyyymm").agg(count($"account_id").as("N"), sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"), sum($"payment").as("payment"), sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" === 1).as("newacct")).persist()
> val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res0: Int = 102
> res.unpersist()
> val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res1: Int = 103



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org