You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ivan Gozali (JIRA)" <ji...@apache.org> on 2017/01/24 21:23:26 UTC

[jira] [Comment Edited] (SPARK-19352) Sorting issues on relatively big datasets

    [ https://issues.apache.org/jira/browse/SPARK-19352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15836653#comment-15836653 ] 

Ivan Gozali edited comment on SPARK-19352 at 1/24/17 9:22 PM:
--------------------------------------------------------------

Does this mean that {{Dataset.write.partitionBy()}} performs a repartition/shuffling of the dataset even if it's already partitioned by the same column beforehand? If this is indeed the case, then with the current API, I haven't seen a way to do what I'd like to do, which is emit one CSV sorted by timestamp per user ID (or at least, multiple sorted CSVs but can easily be merged by concatenating them). This seems like it should have been a very simple thing to do in Spark. 

DataFrameWriter.sortBy() looked promising, but when I used it, it gave me the following error:
{code}
org.apache.spark.sql.AnalysisException: 'save' does not support bucketing right now;
  at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:314)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:207)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
  ...
{code}

Is it not possible to do this?


was (Author: igozali):
Does this mean that {{Dataset.write.partitionBy()}} performs a repartition/shuffling of the dataset even if it's already partitioned by the same column beforehand? If this is indeed the case, then with the current API, I haven't seen a way to do what I'd like to do, which is emit one CSV sorted by timestamp per user ID (or at least, multiple sorted CSVs but can easily be merged by concatenating them). This seems like it should have been a very simple thing to do in Spark. 

DataFrameWriter.sortBy() looked promising, but when I used it, it gave me the following error:
{code}
org.apache.spark.sql.AnalysisException: 'save' does not support bucketing right now;
  at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:314)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:207)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
{code}

Is it not possible to do this?

> Sorting issues on relatively big datasets
> -----------------------------------------
>
>                 Key: SPARK-19352
>                 URL: https://issues.apache.org/jira/browse/SPARK-19352
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>         Environment: Spark version 2.1.0
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102
> macOS 10.12.3
>            Reporter: Ivan Gozali
>
> _More details, including the script to generate the synthetic dataset (requires pandas and numpy) are in this GitHub gist._
> https://gist.github.com/igozali/d327a85646abe7ab10c2ae479bed431f
> Given a relatively large synthetic time series dataset of various users (4.1GB), when attempting to:
> * partition this dataset by user ID
> * sort the time series data for each user by timestamp
> * write each partition to a single CSV file
> then some files are unsorted in a very specific manner. In one of the supposedly sorted files, the rows looked as follows:
> {code}
> 2014-01-01T00:00:00.000-08:00,-0.07,0.39,-0.39
> 2014-12-31T02:07:30.000-08:00,0.34,-0.62,-0.22
> 2014-01-01T00:00:05.000-08:00,-0.07,-0.52,0.47
> 2014-12-31T02:07:35.000-08:00,-0.15,-0.13,-0.14
> 2014-01-01T00:00:10.000-08:00,-1.31,-1.17,2.24
> 2014-12-31T02:07:40.000-08:00,-1.28,0.88,-0.43
> {code}
> The above is attempted using the following Scala/Spark code:
> {code}
> val inpth = "/tmp/gen_data_3cols_small"
> spark
>     .read
>     .option("inferSchema", "true")
>     .option("header", "true")
>     .csv(inpth)
>     .repartition($"userId")
>     .sortWithinPartitions("timestamp")
>     .write
>     .partitionBy("userId")
>     .option("header", "true")
>     .csv(inpth + "_sorted")
> {code}
> This issue is not seen when using a smaller sized dataset by making the time span smaller (354MB, with the same number of columns).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org