You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bozhidar Karaargirov (Jira)" <ji...@apache.org> on 2020/02/23 18:21:00 UTC
[jira] [Updated] (SPARK-30926) Same SQL on CSV and on Parquet gives
different result
[ https://issues.apache.org/jira/browse/SPARK-30926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bozhidar Karaargirov updated SPARK-30926:
-----------------------------------------
Description:
SO I played around with a data set from here: [https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]
I ran the same query for the base CSVs and against a parquet version of them:
{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}
Here is the csv code:
{color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
{color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})
df.createTempView({color:#008000}"airQuality"{color})
{color:#000080}val {color}result = {color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality WHERE P1 > 20"{color})
.map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})
println(result.count())
Here is the parquet code:
{color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
{color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})
df.createTempView({color:#008000}"airQualityP"{color})
{color:#000080}val {color}result = {color:#660e7a}session{color} .sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
.map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})
println(result.count())
And this is how I transform the csv into parquets:
{color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
{color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color})
.csv({color:#660e7a}originalDataset{color})
.map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})
df.write.parquet({color:#660e7a}bigParquetDataset{color})
These are the two mapping functions:
{color:#000080}val {color}{color:#660e7a}mappingFunction {color}= {
r: Row => ParticleAirQuality(
r.getString({color:#0000ff}1{color}),
r.getString({color:#0000ff}2{color}),
r.getString({color:#0000ff}3{color}),
r.getString({color:#0000ff}4{color}),
r.getString({color:#0000ff}5{color}),
{
{color:#000080}val {color}p1 = r.getString({color:#0000ff}6{color})
{color:#000080}if{color}(p1 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p1.toDouble
},
{
{color:#000080}val {color}p2 = r.getString({color:#0000ff}7{color})
{color:#000080}if{color}(p2 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p2.toDouble
}
) }
{color:#000080}val {color}{color:#660e7a}namedMappingFunction {color}= {
r: Row => ParticleAirQuality(
r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
r.getAs[Double]({color:#008000}"P1"{color}),
r.getAs[Double]({color:#008000}"P2"{color})
)
}
If it matters this is the paths:
{color:#000080}val {color}{color:#660e7a}originalDataset {color}= {color:#008000}"D:\{color}{color:#008000}source\{color}{color:#008000}datasets\{color}{color:#008000}sofia-air-quality-dataset\*{color}{color:#008000}*sds**.csv"{color}
{color:#000080}val {color}{color:#660e7a}bigParquetDataset {color}= {color:#008000}"D:\{color}{color:#008000}source\{color}{color:#008000}datasets\{color}{color:#008000}air-tests\{color}{color:#008000}all-parquet"{color}
The count from the csvs I get is: 33934609
While the count from the parquets is: 35739394
was:
SO I played around with a data set from here: [https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]
I ran the same query for the base CSVs and against a parquet version of them:
{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}
Here is the csv code:
{color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
{color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})
df.createTempView({color:#008000}"airQuality"{color})
{color:#000080}val {color}result = {color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality WHERE P1 > 20"{color})
.map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})
println(result.count())
Here is the parquet code:
{color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
{color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})
df.createTempView({color:#008000}"airQualityP"{color})
{color:#000080}val {color}result = {color:#660e7a}session
{color} .sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
.map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})
println(result.count())
And this is how I transform the csv into parquets:
{color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
{color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color})
.csv({color:#660e7a}originalDataset{color})
.map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})
df.write.parquet({color:#660e7a}bigParquetDataset{color})
These are the two mapping functions:
{color:#000080}val {color}{color:#660e7a}mappingFunction {color}= {
r: Row => ParticleAirQuality(
r.getString({color:#0000ff}1{color}),
r.getString({color:#0000ff}2{color}),
r.getString({color:#0000ff}3{color}),
r.getString({color:#0000ff}4{color}),
r.getString({color:#0000ff}5{color}),
{
{color:#000080}val {color}p1 = r.getString({color:#0000ff}6{color})
{color:#000080}if{color}(p1 == {color:#000080}null{color}) Double.{color:#660e7a}NaN
{color} {color:#000080}else {color}p1.toDouble
},
{
{color:#000080}val {color}p2 = r.getString({color:#0000ff}7{color})
{color:#000080}if{color}(p2 == {color:#000080}null{color}) Double.{color:#660e7a}NaN
{color} {color:#000080}else {color}p2.toDouble
}
) }
{color:#000080}val {color}{color:#660e7a}namedMappingFunction {color}= {
r: Row => ParticleAirQuality(
r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
r.getAs[Double]({color:#008000}"P1"{color}),
r.getAs[Double]({color:#008000}"P2"{color})
)
}
If it matters this is the paths:
{color:#000080}val {color}{color:#660e7a}originalDataset {color}= {color:#008000}"D:{color}{color:#000080}\\{color}{color:#008000}source{color}{color:#000080}\\{color}{color:#008000}datasets{color}{color:#000080}\\{color}{color:#008000}sofia-air-quality-dataset{color}{color:#000080}\\{color}{color:#008000}*sds*.csv"
{color}{color:#000080}val {color}{color:#660e7a}bigParquetDataset {color}= {color:#008000}"D:{color}{color:#000080}\\{color}{color:#008000}source{color}{color:#000080}\\{color}{color:#008000}datasets{color}{color:#000080}\\{color}{color:#008000}air-tests{color}{color:#000080}\\{color}{color:#008000}all-parquet"{color}
The count from the csvs I get is: 33934609
While the count from the parquets is: 35739394
> Same SQL on CSV and on Parquet gives different result
> -----------------------------------------------------
>
> Key: SPARK-30926
> URL: https://issues.apache.org/jira/browse/SPARK-30926
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.4
> Environment: I run this locally on a windows 10 machine.
> The java runtime is:
> {color:#cccccc}openjdk 11.0.5 2019-10-15
> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.5+10)
> OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.5+10, mixed mode){color}
> Reporter: Bozhidar Karaargirov
> Priority: Major
>
> SO I played around with a data set from here: [https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]
> I ran the same query for the base CSVs and against a parquet version of them:
> {color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}
> Here is the csv code:
> {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
> {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})
> df.createTempView({color:#008000}"airQuality"{color})
> {color:#000080}val {color}result = {color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality WHERE P1 > 20"{color})
> .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})
> println(result.count())
>
> Here is the parquet code:
>
> {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
> {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})
> df.createTempView({color:#008000}"airQualityP"{color})
> {color:#000080}val {color}result = {color:#660e7a}session{color} .sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
> .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})
> println(result.count())
>
> And this is how I transform the csv into parquets:
> {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._
> {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color})
> .csv({color:#660e7a}originalDataset{color})
> .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})
> df.write.parquet({color:#660e7a}bigParquetDataset{color})
>
> These are the two mapping functions:
> {color:#000080}val {color}{color:#660e7a}mappingFunction {color}= {
> r: Row => ParticleAirQuality(
> r.getString({color:#0000ff}1{color}),
> r.getString({color:#0000ff}2{color}),
> r.getString({color:#0000ff}3{color}),
> r.getString({color:#0000ff}4{color}),
> r.getString({color:#0000ff}5{color}),
> {
> {color:#000080}val {color}p1 = r.getString({color:#0000ff}6{color})
> {color:#000080}if{color}(p1 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p1.toDouble
> },
> {
> {color:#000080}val {color}p2 = r.getString({color:#0000ff}7{color})
> {color:#000080}if{color}(p2 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p2.toDouble
> }
> ) }
> {color:#000080}val {color}{color:#660e7a}namedMappingFunction {color}= {
> r: Row => ParticleAirQuality(
> r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
> r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
> r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
> r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
> r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
> r.getAs[Double]({color:#008000}"P1"{color}),
> r.getAs[Double]({color:#008000}"P2"{color})
> )
> }
>
> If it matters this is the paths:
> {color:#000080}val {color}{color:#660e7a}originalDataset {color}= {color:#008000}"D:\{color}{color:#008000}source\{color}{color:#008000}datasets\{color}{color:#008000}sofia-air-quality-dataset\*{color}{color:#008000}*sds**.csv"{color}
> {color:#000080}val {color}{color:#660e7a}bigParquetDataset {color}= {color:#008000}"D:\{color}{color:#008000}source\{color}{color:#008000}datasets\{color}{color:#008000}air-tests\{color}{color:#008000}all-parquet"{color}
>
> The count from the csvs I get is: 33934609
> While the count from the parquets is: 35739394
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org