You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2016/10/06 18:58:20 UTC
[jira] [Commented] (SPARK-17806) Incorrect result when work with
data from parquet
[ https://issues.apache.org/jira/browse/SPARK-17806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15552830#comment-15552830 ]
Josh Rosen commented on SPARK-17806:
------------------------------------
I was able to confirm that this is still a problem as of 2.0.1. To rule out the possibility of this being a self-join bug, I rewrote this to read the parquet file twice, then joined those tables together. Here's the query plan:
{code}
== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L = c#1908L))
:- Relation[a#1899,b#1900,c#1901L] parquet
+- Relation[a#1906,b#1907,c#1908L] parquet
== Analyzed Logical Plan ==
a: int, b: int, c: bigint, a: int, b: int, c: bigint
GlobalLimit 21
+- LocalLimit 21
+- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L = c#1908L))
:- Relation[a#1899,b#1900,c#1901L] parquet
+- Relation[a#1906,b#1907,c#1908L] parquet
== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Join LeftOuter, (((a#1899 = a#1906) && (b#1900 = b#1907)) && (c#1901L = c#1908L))
:- LocalLimit 21
: +- Relation[a#1899,b#1900,c#1901L] parquet
+- Relation[a#1906,b#1907,c#1908L] parquet
== Physical Plan ==
CollectLimit 21
+- *BroadcastHashJoin [a#1899, b#1900, c#1901L], [a#1906, b#1907, c#1908L], LeftOuter, BuildRight
:- *LocalLimit 21
: +- *BatchedScan parquet [a#1899,b#1900,c#1901L] Format: ParquetFormat, InputPaths: dbfs:/tmp/test, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[1, int, true] as bigint) & 4294967295)), 64) | (cast(input[2, bigint, true] as bigint) & 0))))
+- *BatchedScan parquet [a#1906,b#1907,c#1908L] Format: ParquetFormat, InputPaths: dbfs:/tmp/test, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:bigint>
{code}
> Incorrect result when work with data from parquet
> -------------------------------------------------
>
> Key: SPARK-17806
> URL: https://issues.apache.org/jira/browse/SPARK-17806
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.0, 2.0.1
> Reporter: Vitaly Gerasimov
> Priority: Critical
> Labels: correctness
>
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.types.{StructField, StructType}
> import org.apache.spark.sql.types.DataTypes._
> val sc = SparkSession.builder().config(new SparkConf().setMaster("local")).getOrCreate()
> val jsonRDD = sc.sparkContext.parallelize(Seq(
> """{"a":1,"b":1,"c":1}""",
> """{"a":1,"b":1,"c":2}"""
> ))
> sc.read.schema(StructType(Seq(
> StructField("a", IntegerType),
> StructField("b", IntegerType),
> StructField("c", LongType)
> ))).json(jsonRDD).write.parquet("/tmp/test")
> val df = sc.read.load("/tmp/test")
> df.join(df, Seq("a", "b", "c"), "left_outer").show()
> {code}
> returns:
> {code}
> +---+---+---+
> | a| b| c|
> +---+---+---+
> | 1| 1| 1|
> | 1| 1| 1|
> | 1| 1| 2|
> | 1| 1| 2|
> +---+---+---+
> {code}
> Expected result:
> {code}
> +---+---+---+
> | a| b| c|
> +---+---+---+
> | 1| 1| 1|
> | 1| 1| 2|
> +---+---+---+
> {code}
> If I use this code without saving to parquet it works fine. If you change type of `c` column to `IntegerType` it also works fine.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org