You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2016/10/06 18:58:20 UTC

[jira] [Commented] (SPARK-17806) Incorrect result when work with data from parquet

    [ https://issues.apache.org/jira/browse/SPARK-17806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15552830#comment-15552830 ] 

Josh Rosen commented on SPARK-17806:
------------------------------------

I was able to confirm that this is still a problem as of 2.0.1. To rule out the possibility of this being a self-join bug, I rewrote this to read the parquet file twice, then joined those tables together. Here's the query plan:

{code}
== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
   +- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L = c#1908L))
      :- Relation[a#1899,b#1900,c#1901L] parquet
      +- Relation[a#1906,b#1907,c#1908L] parquet

== Analyzed Logical Plan ==
a: int, b: int, c: bigint, a: int, b: int, c: bigint
GlobalLimit 21
+- LocalLimit 21
   +- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L = c#1908L))
      :- Relation[a#1899,b#1900,c#1901L] parquet
      +- Relation[a#1906,b#1907,c#1908L] parquet

== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
   +- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L = c#1908L))
      :- LocalLimit 21
      :  +- Relation[a#1899,b#1900,c#1901L] parquet
      +- Relation[a#1906,b#1907,c#1908L] parquet

== Physical Plan ==
CollectLimit 21
+- *BroadcastHashJoin [a#1899, b#1900, c#1901L], [a#1906, b#1907, c#1908L], LeftOuter, BuildRight
   :- *LocalLimit 21
   :  +- *BatchedScan parquet [a#1899,b#1900,c#1901L] Format: ParquetFormat, InputPaths: dbfs:/tmp/test, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[1, int, true] as bigint) & 4294967295)), 64) | (cast(input[2, bigint, true] as bigint) & 0))))
      +- *BatchedScan parquet [a#1906,b#1907,c#1908L] Format: ParquetFormat, InputPaths: dbfs:/tmp/test, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:bigint>
{code}


> Incorrect result when work with data from parquet
> -------------------------------------------------
>
>                 Key: SPARK-17806
>                 URL: https://issues.apache.org/jira/browse/SPARK-17806
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Vitaly Gerasimov
>            Priority: Critical
>              Labels: correctness
>
> {code}
>   import org.apache.spark.SparkConf
>   import org.apache.spark.sql.SparkSession
>   import org.apache.spark.sql.types.{StructField, StructType}
>   import org.apache.spark.sql.types.DataTypes._
>   val sc = SparkSession.builder().config(new SparkConf().setMaster("local")).getOrCreate()
>   val jsonRDD = sc.sparkContext.parallelize(Seq(
>     """{"a":1,"b":1,"c":1}""",
>     """{"a":1,"b":1,"c":2}"""
>   ))
>   sc.read.schema(StructType(Seq(
>     StructField("a", IntegerType),
>     StructField("b", IntegerType),
>     StructField("c", LongType)
>   ))).json(jsonRDD).write.parquet("/tmp/test")
>   val df = sc.read.load("/tmp/test")
>   df.join(df, Seq("a", "b", "c"), "left_outer").show()
> {code}
> returns:
> {code}
> +---+---+---+
> |  a|  b|  c|
> +---+---+---+
> |  1|  1|  1|
> |  1|  1|  1|
> |  1|  1|  2|
> |  1|  1|  2|
> +---+---+---+
> {code}
> Expected result:
> {code}
> +---+---+---+
> |  a|  b|  c|
> +---+---+---+
> |  1|  1|  1|
> |  1|  1|  2|
> +---+---+---+
> {code}
> If I use this code without saving to parquet it works fine. If you change type of `c` column to `IntegerType` it also works fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org