You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruslan Dautkhanov (JIRA)" <ji...@apache.org> on 2018/01/15 17:41:00 UTC

[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

    [ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326455#comment-16326455 ] 

Ruslan Dautkhanov commented on SPARK-23074:
-------------------------------------------

{quote}You can create a DataFrame from the result of .zipWithIndex on an RDD, as you see here.{quote}

That's very kludgy as you can see the code snippet above. A direct functionality for this RDD-level api would be super awesome to have.

{quote}There's already a rowNumber function in Spark SQL, however, which sounds like the native equivalent?{quote}

That's not the same. rowNumber requires an expression to `order by` on. What if there is no such column? For example, we often 
have files that we ingest into Spark and where we physical position of a record is meaningful how that record has to be processed. 
I can give exact example if you're necessary. zipWithIndex() actually the only one API call that preserves such information from 
original source (even though it can be distributed into multiple partitions etc.).
Also as folks in that stackoverflow question said, rowNumber approach is way slower (and it's not surprizing as it requires data sorting).



> Dataframe-ified zipwithindex
> ----------------------------
>
>                 Key: SPARK-23074
>                 URL: https://issues.apache.org/jira/browse/SPARK-23074
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: Ruslan Dautkhanov
>            Priority: Minor
>              Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
>     df.rdd.zipWithIndex.map(ln =>
>       Row.fromSeq(
>         (if (inFront) Seq(ln._2 + offset) else Seq())
>           ++ ln._1.toSeq ++
>         (if (inFront) Seq() else Seq(ln._2 + offset))
>       )
>     ),
>     StructType(
>       (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) 
>         ++ df.schema.fields ++ 
>       (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
>     )
>   ) 
> }
> {code}
> credits: [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org