You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2017/02/03 11:49:51 UTC

[jira] [Commented] (SPARK-19352) Sorting issues on relatively big datasets

    [ https://issues.apache.org/jira/browse/SPARK-19352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851387#comment-15851387 ] 

Wenchen Fan commented on SPARK-19352:
-------------------------------------

DataFrameWriter doesn't allow users to write data out orderly, so unfortunately you can't do this. This makes sense because sorted data files are not useful: when you read it back, Spark can't get the ordering information back and optimize for it.

However, in your example, the data files should be sorted, according to how Spark write partitioned data. If the input data is already partitioned, the writer doesn't need to sort the data by partition columns anymore.

> Sorting issues on relatively big datasets
> -----------------------------------------
>
>                 Key: SPARK-19352
>                 URL: https://issues.apache.org/jira/browse/SPARK-19352
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>         Environment: Spark version 2.1.0
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102
> macOS 10.12.3
>            Reporter: Ivan Gozali
>
> _More details, including the script to generate the synthetic dataset (requires pandas and numpy) are in this GitHub gist._
> https://gist.github.com/igozali/d327a85646abe7ab10c2ae479bed431f
> Given a relatively large synthetic time series dataset of various users (4.1GB), when attempting to:
> * partition this dataset by user ID
> * sort the time series data for each user by timestamp
> * write each partition to a single CSV file
> then some files are unsorted in a very specific manner. In one of the supposedly sorted files, the rows looked as follows:
> {code}
> 2014-01-01T00:00:00.000-08:00,-0.07,0.39,-0.39
> 2014-12-31T02:07:30.000-08:00,0.34,-0.62,-0.22
> 2014-01-01T00:00:05.000-08:00,-0.07,-0.52,0.47
> 2014-12-31T02:07:35.000-08:00,-0.15,-0.13,-0.14
> 2014-01-01T00:00:10.000-08:00,-1.31,-1.17,2.24
> 2014-12-31T02:07:40.000-08:00,-1.28,0.88,-0.43
> {code}
> The above is attempted using the following Scala/Spark code:
> {code}
> val inpth = "/tmp/gen_data_3cols_small"
> spark
>     .read
>     .option("inferSchema", "true")
>     .option("header", "true")
>     .csv(inpth)
>     .repartition($"userId")
>     .sortWithinPartitions("timestamp")
>     .write
>     .partitionBy("userId")
>     .option("header", "true")
>     .csv(inpth + "_sorted")
> {code}
> This issue is not seen when using a smaller sized dataset by making the time span smaller (354MB, with the same number of columns).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org