You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Silas Davis (JIRA)" <ji...@apache.org> on 2015/08/17 15:09:45 UTC

[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs

    [ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699504#comment-14699504 ] 

Silas Davis commented on SPARK-3533:
------------------------------------

I'd like to suggest this be re-opened, writing partitions of a dataset to separate files based on keys is a common use case that is provided by Hadoop and Cascading for example.

DataFrameWriter has partitionBy, but is only supported for parquet, and does not support cases where you wish to work with plain RDDs (for example using specific avro classes, or when you want to transform using map, mapPartitions, or combineByKey, which takes you out of DataFrame land).

I have a working implementation based on the Hadoop 2+ MultipleOutputs class. The basic idea is to wrap an underlying OutputFormat within a OutputFormat class that derives from a MultipleOuputsFormat class that maintains an instance of MultipleOuputs for writing out based on key. Here is the gist:

https://gist.github.com/silasdavis/d1d1f1f7ab78249af462

I've included tests and helper functions for completeness, but the meat of the implementation is the first 100 lines. You can also see how it's meant to be used by look at the saveAsMultipleAvroFiles code: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462#file-multipleoutputs-scala-L287

It would be useful to get some comments on the general idea. I've tried to use as much of the Hadoop machinery as possible, similar to how PairRDDFunctions does. This means no existing spark code has needed to be changed, but a similar approach could be taken to incorporate MultipleOuptuts within the saveAsNewAPIHadoopDataset method.

> Add saveAsTextFileByKey() method to RDDs
> ----------------------------------------
>
>                 Key: SPARK-3533
>                 URL: https://issues.apache.org/jira/browse/SPARK-3533
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, Spark Core
>    Affects Versions: 1.1.0
>            Reporter: Nicholas Chammas
>
> Users often have a single RDD of key-value pairs that they want to save to multiple locations based on the keys.
> For example, say I have an RDD like this:
> {code}
> >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0])
> >>> a.collect()
> [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')]
> >>> a.keys().distinct().collect()
> ['B', 'F', 'N']
> {code}
> Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple {{part-}} files, one per RDD partition.
> So the output would look something like:
> {code}
> /path/prefix/B [/part-1, /part-2, etc]
> /path/prefix/F [/part-1, /part-2, etc]
> /path/prefix/N [/part-1, /part-2, etc]
> {code}
> Though it may be possible to do this with some combination of {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the {{MultipleTextOutputFormat}} output format class, it isn't straightforward. It's not clear if it's even possible at all in PySpark.
> Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs that makes it easy to save RDDs out to multiple locations at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org