You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2018/07/02 06:19:00 UTC

[jira] [Commented] (SPARK-24705) Spark.sql.adaptive.enabled=true is enabled and self-join query

    [ https://issues.apache.org/jira/browse/SPARK-24705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529421#comment-16529421 ] 

Takeshi Yamamuro commented on SPARK-24705:
------------------------------------------

I checked the master has the same issue. Also, it seems this issue only happens when using jdbc sources.
{code}
// Prepare test data in postgresql
postgres=# create table device_loc(imei int, speed int);
CREATE TABLE
postgres=# insert into device_loc values (1, 1);
INSERT 0 1
postgres=# select * from device_loc;
 imei | speed 
------+-------
    1 |     1
(1 row)


// Register as a jdbc table
scala> val jdbcTable = spark.read.jdbc("jdbc:postgresql:postgres", "device_loc", options)
scala> jdbcTable.registerTempTable("device_loc")
scala> sql("SELECT * FROM device_loc").show
+----+-----+
|imei|speed|
+----+-----+
|   1|    1|
+----+-----+

// Prepare a query
scala> :paste
val df = sql("""
select tv_a.imei
  from ( select a.imei,a.speed from device_loc a) tv_a
  inner join ( select a.imei,a.speed from device_loc a ) tv_b on tv_a.imei = tv_b.imei
  group by tv_a.imei
""")

// Run tests
scala> sql("SET spark.sql.adaptive.enabled=false")
scala> df.show
+----+
|imei|
+----+
|   1|
+----+

scala> sql("SET spark.sql.adaptive.enabled=true")
scala> df.show
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange(coordinator id: 1401717308) hashpartitioning(imei#0, 200), coordinator[target post-shuffle partition size: 67108864]
+- *(1) Scan JDBCRelation(device_loc) [numPartitions=1] [imei#0] PushedFilters: [*IsNotNull(imei)], ReadSchema: struct<imei:int>

Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 100 more
{code}

This issue doesn't happen in datasource
{code}
scala> sql("SET spark.sql.adaptive.enabled=true")
scala> spark.range(1).selectExpr("id AS imei", "id AS speed").write.saveAsTable("device_loc")
scala> :paste
val df = sql("""
select tv_a.imei
  from ( select a.imei,a.speed from device_loc a) tv_a
  inner join ( select a.imei,a.speed from device_loc a ) tv_b on tv_a.imei = tv_b.imei
  group by tv_a.imei
""")
scala> df.show()
+----+
|imei|
+----+
|   0|
+----+
{code}

> Spark.sql.adaptive.enabled=true is enabled and self-join query
> --------------------------------------------------------------
>
>                 Key: SPARK-24705
>                 URL: https://issues.apache.org/jira/browse/SPARK-24705
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.1, 2.3.1
>            Reporter: cheng
>            Priority: Minor
>         Attachments: Error stack.txt
>
>
> [~smilegator]
> When loading data using jdbc and enabling spark.sql.adaptive.enabled=true , for example loading a tableA table, unexpected results can occur when you use the following query.
> For example:
> device_loc table comes from the jdbc data source
> select tv_a.imei
> from ( select a.imei,a.speed from device_loc a) tv_a
> inner join ( select a.imei,a.speed from device_loc a ) tv_b on tv_a.imei = tv_b.imei
> group by tv_a.imei
> When the cache tabel device_loc is executed before this query is executed, everything is fine,However, if you do not execute cache table, unexpected results will occur, resulting in failure to execute.
> Remarks:Attachment records the stack when the error occurred



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org