You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jarno Seppanen (JIRA)" <ji...@apache.org> on 2015/10/29 17:46:28 UTC

[jira] [Updated] (SPARK-11405) ROW_NUMBER function does not adhere to window ORDER BY, when joining

     [ https://issues.apache.org/jira/browse/SPARK-11405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jarno Seppanen updated SPARK-11405:
-----------------------------------
       Priority: Critical  (was: Major)
    Description: 
The following query produces incorrect results:
{code:sql}
sqlContext.sql("""
  SELECT a.i, a.x,
    ROW_NUMBER() OVER (
      PARTITION BY a.i ORDER BY a.x) AS row_num
  FROM a
  JOIN b ON b.i = a.i
""").show()
+---+--------------------+-------+
|  i|                   x|row_num|
+---+--------------------+-------+
|  1|  0.8717439935587555|      1|
|  1|  0.6684483939068196|      2|
|  1|  0.3378351523586306|      3|
|  1|  0.2483285619632939|      4|
|  1|  0.4796752841655936|      5|
|  2|  0.2971739640384895|      1|
|  2|  0.2199359901600595|      2|
|  2|  0.4646004597998037|      3|
|  2| 0.24823688829578183|      4|
|  2|  0.5914212915574378|      5|
|  3|0.010912835935112164|      1|
|  3|  0.6520139509583123|      2|
|  3|  0.8571994559240592|      3|
|  3|  0.1122635843020473|      4|
|  3| 0.45913022936460457|      5|
+---+--------------------+-------+
{code}

The row number doesn't follow the correct order. The join seems to break the order, ROW_NUMBER() works correctly if the join results are saved to a temporary table, and a second query is made.

Here's a small PySpark test case to reproduce the error:

{code}

from pyspark.sql import Row
import random

a = sc.parallelize([Row(i=i, x=random.random())
                    for i in range(5)
                    for j in range(5)])

b = sc.parallelize([Row(i=i) for i in [1, 2, 3]])

af = sqlContext.createDataFrame(a)
bf = sqlContext.createDataFrame(b)
af.registerTempTable('a')
bf.registerTempTable('b')

af.show()
# +---+--------------------+
# |  i|                   x|
# +---+--------------------+
# |  0| 0.12978974167478896|
# |  0|  0.7105927498584452|
# |  0| 0.21225679077448045|
# |  0| 0.03849717391728036|
# |  0|  0.4976622146442401|
# |  1|  0.4796752841655936|
# |  1|  0.8717439935587555|
# |  1|  0.6684483939068196|
# |  1|  0.3378351523586306|
# |  1|  0.2483285619632939|
# |  2|  0.2971739640384895|
# |  2|  0.2199359901600595|
# |  2|  0.5914212915574378|
# |  2| 0.24823688829578183|
# |  2|  0.4646004597998037|
# |  3|  0.1122635843020473|
# |  3|  0.6520139509583123|
# |  3| 0.45913022936460457|
# |  3|0.010912835935112164|
# |  3|  0.8571994559240592|
# +---+--------------------+
# only showing top 20 rows

bf.show()
# +---+
# |  i|
# +---+
# |  1|
# |  2|
# |  3|
# +---+

### WRONG

sqlContext.sql("""
  SELECT a.i, a.x,
    ROW_NUMBER() OVER (
      PARTITION BY a.i ORDER BY a.x) AS row_num
  FROM a
  JOIN b ON b.i = a.i
""").show()
# +---+--------------------+-------+
# |  i|                   x|row_num|
# +---+--------------------+-------+
# |  1|  0.8717439935587555|      1|
# |  1|  0.6684483939068196|      2|
# |  1|  0.3378351523586306|      3|
# |  1|  0.2483285619632939|      4|
# |  1|  0.4796752841655936|      5|
# |  2|  0.2971739640384895|      1|
# |  2|  0.2199359901600595|      2|
# |  2|  0.4646004597998037|      3|
# |  2| 0.24823688829578183|      4|
# |  2|  0.5914212915574378|      5|
# |  3|0.010912835935112164|      1|
# |  3|  0.6520139509583123|      2|
# |  3|  0.8571994559240592|      3|
# |  3|  0.1122635843020473|      4|
# |  3| 0.45913022936460457|      5|
# +---+--------------------+-------+

### WORKAROUND BY USING TEMP TABLE

t = sqlContext.sql("""
  SELECT a.i, a.x
  FROM a
  JOIN b ON b.i = a.i
""").cache()
# trigger computation
t.head()
t.registerTempTable('t')

sqlContext.sql("""
  SELECT i, x,
    ROW_NUMBER() OVER (
      PARTITION BY i ORDER BY x) AS row_num
  FROM t
""").show()
# +---+--------------------+-------+
# |  i|                   x|row_num|
# +---+--------------------+-------+
# |  1|  0.2483285619632939|      1|
# |  1|  0.3378351523586306|      2|
# |  1|  0.4796752841655936|      3|
# |  1|  0.6684483939068196|      4|
# |  1|  0.8717439935587555|      5|
# |  2|  0.2199359901600595|      1|
# |  2| 0.24823688829578183|      2|
# |  2|  0.2971739640384895|      3|
# |  2|  0.4646004597998037|      4|
# |  2|  0.5914212915574378|      5|
# |  3|0.010912835935112164|      1|
# |  3|  0.1122635843020473|      2|
# |  3| 0.45913022936460457|      3|
# |  3|  0.6520139509583123|      4|
# |  3|  0.8571994559240592|      5|
# +---+--------------------+-------+
{code}

  was:
The following query produces incorrect results:
{code:sql}
  SELECT a.i, a.x,
    ROW_NUMBER() OVER (
      PARTITION BY a.i ORDER BY a.x) AS row_num
  FROM a
  JOIN b ON b.i = a.i;
+---+--------------------+-------+
|  i|                   x|row_num|
+---+--------------------+-------+
|  1|  0.8717439935587555|      1|
|  1|  0.6684483939068196|      2|
|  1|  0.3378351523586306|      3|
|  1|  0.2483285619632939|      4|
|  1|  0.4796752841655936|      5|
|  2|  0.2971739640384895|      1|
|  2|  0.2199359901600595|      2|
|  2|  0.4646004597998037|      3|
|  2| 0.24823688829578183|      4|
|  2|  0.5914212915574378|      5|
|  3|0.010912835935112164|      1|
|  3|  0.6520139509583123|      2|
|  3|  0.8571994559240592|      3|
|  3|  0.1122635843020473|      4|
|  3| 0.45913022936460457|      5|
+---+--------------------+-------+
{code}

The row number doesn't follow the correct order. The join seems to break the order, ROW_NUMBER() works correctly if the join results are saved to a temporary table, and a second query is made.

Here's a small PySpark test case to reproduce the error:

{code}

from pyspark.sql import Row
import random

a = sc.parallelize([Row(i=i, x=random.random())
                    for i in range(5)
                    for j in range(5)])

b = sc.parallelize([Row(i=i) for i in [1, 2, 3]])

af = sqlContext.createDataFrame(a)
bf = sqlContext.createDataFrame(b)
af.registerTempTable('a')
bf.registerTempTable('b')

af.show()
# +---+--------------------+
# |  i|                   x|
# +---+--------------------+
# |  0| 0.12978974167478896|
# |  0|  0.7105927498584452|
# |  0| 0.21225679077448045|
# |  0| 0.03849717391728036|
# |  0|  0.4976622146442401|
# |  1|  0.4796752841655936|
# |  1|  0.8717439935587555|
# |  1|  0.6684483939068196|
# |  1|  0.3378351523586306|
# |  1|  0.2483285619632939|
# |  2|  0.2971739640384895|
# |  2|  0.2199359901600595|
# |  2|  0.5914212915574378|
# |  2| 0.24823688829578183|
# |  2|  0.4646004597998037|
# |  3|  0.1122635843020473|
# |  3|  0.6520139509583123|
# |  3| 0.45913022936460457|
# |  3|0.010912835935112164|
# |  3|  0.8571994559240592|
# +---+--------------------+
# only showing top 20 rows

bf.show()
# +---+
# |  i|
# +---+
# |  1|
# |  2|
# |  3|
# +---+

### WRONG

sqlContext.sql("""
  SELECT a.i, a.x,
    ROW_NUMBER() OVER (
      PARTITION BY a.i ORDER BY a.x) AS row_num
  FROM a
  JOIN b ON b.i = a.i
""").show()
# +---+--------------------+-------+
# |  i|                   x|row_num|
# +---+--------------------+-------+
# |  1|  0.8717439935587555|      1|
# |  1|  0.6684483939068196|      2|
# |  1|  0.3378351523586306|      3|
# |  1|  0.2483285619632939|      4|
# |  1|  0.4796752841655936|      5|
# |  2|  0.2971739640384895|      1|
# |  2|  0.2199359901600595|      2|
# |  2|  0.4646004597998037|      3|
# |  2| 0.24823688829578183|      4|
# |  2|  0.5914212915574378|      5|
# |  3|0.010912835935112164|      1|
# |  3|  0.6520139509583123|      2|
# |  3|  0.8571994559240592|      3|
# |  3|  0.1122635843020473|      4|
# |  3| 0.45913022936460457|      5|
# +---+--------------------+-------+

### WORKAROUND BY USING TEMP TABLE

t = sqlContext.sql("""
  SELECT a.i, a.x
  FROM a
  JOIN b ON b.i = a.i
""").cache()
# trigger computation
t.head()
t.registerTempTable('t')

sqlContext.sql("""
  SELECT i, x,
    ROW_NUMBER() OVER (
      PARTITION BY i ORDER BY x) AS row_num
  FROM t
""").show()
# +---+--------------------+-------+
# |  i|                   x|row_num|
# +---+--------------------+-------+
# |  1|  0.2483285619632939|      1|
# |  1|  0.3378351523586306|      2|
# |  1|  0.4796752841655936|      3|
# |  1|  0.6684483939068196|      4|
# |  1|  0.8717439935587555|      5|
# |  2|  0.2199359901600595|      1|
# |  2| 0.24823688829578183|      2|
# |  2|  0.2971739640384895|      3|
# |  2|  0.4646004597998037|      4|
# |  2|  0.5914212915574378|      5|
# |  3|0.010912835935112164|      1|
# |  3|  0.1122635843020473|      2|
# |  3| 0.45913022936460457|      3|
# |  3|  0.6520139509583123|      4|
# |  3|  0.8571994559240592|      5|
# +---+--------------------+-------+
{code}


bump to critical for data corruption; edit example query for clarity

> ROW_NUMBER function does not adhere to window ORDER BY, when joining
> --------------------------------------------------------------------
>
>                 Key: SPARK-11405
>                 URL: https://issues.apache.org/jira/browse/SPARK-11405
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>         Environment: YARN
>            Reporter: Jarno Seppanen
>            Priority: Critical
>
> The following query produces incorrect results:
> {code:sql}
> sqlContext.sql("""
>   SELECT a.i, a.x,
>     ROW_NUMBER() OVER (
>       PARTITION BY a.i ORDER BY a.x) AS row_num
>   FROM a
>   JOIN b ON b.i = a.i
> """).show()
> +---+--------------------+-------+
> |  i|                   x|row_num|
> +---+--------------------+-------+
> |  1|  0.8717439935587555|      1|
> |  1|  0.6684483939068196|      2|
> |  1|  0.3378351523586306|      3|
> |  1|  0.2483285619632939|      4|
> |  1|  0.4796752841655936|      5|
> |  2|  0.2971739640384895|      1|
> |  2|  0.2199359901600595|      2|
> |  2|  0.4646004597998037|      3|
> |  2| 0.24823688829578183|      4|
> |  2|  0.5914212915574378|      5|
> |  3|0.010912835935112164|      1|
> |  3|  0.6520139509583123|      2|
> |  3|  0.8571994559240592|      3|
> |  3|  0.1122635843020473|      4|
> |  3| 0.45913022936460457|      5|
> +---+--------------------+-------+
> {code}
> The row number doesn't follow the correct order. The join seems to break the order, ROW_NUMBER() works correctly if the join results are saved to a temporary table, and a second query is made.
> Here's a small PySpark test case to reproduce the error:
> {code}
> from pyspark.sql import Row
> import random
> a = sc.parallelize([Row(i=i, x=random.random())
>                     for i in range(5)
>                     for j in range(5)])
> b = sc.parallelize([Row(i=i) for i in [1, 2, 3]])
> af = sqlContext.createDataFrame(a)
> bf = sqlContext.createDataFrame(b)
> af.registerTempTable('a')
> bf.registerTempTable('b')
> af.show()
> # +---+--------------------+
> # |  i|                   x|
> # +---+--------------------+
> # |  0| 0.12978974167478896|
> # |  0|  0.7105927498584452|
> # |  0| 0.21225679077448045|
> # |  0| 0.03849717391728036|
> # |  0|  0.4976622146442401|
> # |  1|  0.4796752841655936|
> # |  1|  0.8717439935587555|
> # |  1|  0.6684483939068196|
> # |  1|  0.3378351523586306|
> # |  1|  0.2483285619632939|
> # |  2|  0.2971739640384895|
> # |  2|  0.2199359901600595|
> # |  2|  0.5914212915574378|
> # |  2| 0.24823688829578183|
> # |  2|  0.4646004597998037|
> # |  3|  0.1122635843020473|
> # |  3|  0.6520139509583123|
> # |  3| 0.45913022936460457|
> # |  3|0.010912835935112164|
> # |  3|  0.8571994559240592|
> # +---+--------------------+
> # only showing top 20 rows
> bf.show()
> # +---+
> # |  i|
> # +---+
> # |  1|
> # |  2|
> # |  3|
> # +---+
> ### WRONG
> sqlContext.sql("""
>   SELECT a.i, a.x,
>     ROW_NUMBER() OVER (
>       PARTITION BY a.i ORDER BY a.x) AS row_num
>   FROM a
>   JOIN b ON b.i = a.i
> """).show()
> # +---+--------------------+-------+
> # |  i|                   x|row_num|
> # +---+--------------------+-------+
> # |  1|  0.8717439935587555|      1|
> # |  1|  0.6684483939068196|      2|
> # |  1|  0.3378351523586306|      3|
> # |  1|  0.2483285619632939|      4|
> # |  1|  0.4796752841655936|      5|
> # |  2|  0.2971739640384895|      1|
> # |  2|  0.2199359901600595|      2|
> # |  2|  0.4646004597998037|      3|
> # |  2| 0.24823688829578183|      4|
> # |  2|  0.5914212915574378|      5|
> # |  3|0.010912835935112164|      1|
> # |  3|  0.6520139509583123|      2|
> # |  3|  0.8571994559240592|      3|
> # |  3|  0.1122635843020473|      4|
> # |  3| 0.45913022936460457|      5|
> # +---+--------------------+-------+
> ### WORKAROUND BY USING TEMP TABLE
> t = sqlContext.sql("""
>   SELECT a.i, a.x
>   FROM a
>   JOIN b ON b.i = a.i
> """).cache()
> # trigger computation
> t.head()
> t.registerTempTable('t')
> sqlContext.sql("""
>   SELECT i, x,
>     ROW_NUMBER() OVER (
>       PARTITION BY i ORDER BY x) AS row_num
>   FROM t
> """).show()
> # +---+--------------------+-------+
> # |  i|                   x|row_num|
> # +---+--------------------+-------+
> # |  1|  0.2483285619632939|      1|
> # |  1|  0.3378351523586306|      2|
> # |  1|  0.4796752841655936|      3|
> # |  1|  0.6684483939068196|      4|
> # |  1|  0.8717439935587555|      5|
> # |  2|  0.2199359901600595|      1|
> # |  2| 0.24823688829578183|      2|
> # |  2|  0.2971739640384895|      3|
> # |  2|  0.4646004597998037|      4|
> # |  2|  0.5914212915574378|      5|
> # |  3|0.010912835935112164|      1|
> # |  3|  0.1122635843020473|      2|
> # |  3| 0.45913022936460457|      3|
> # |  3|  0.6520139509583123|      4|
> # |  3|  0.8571994559240592|      5|
> # +---+--------------------+-------+
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org