You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tathagata Das (JIRA)" <ji...@apache.org> on 2018/02/13 09:24:00 UTC

[jira] [Created] (SPARK-23406) Enable stream-stream self joins

Tathagata Das created SPARK-23406:
-------------------------------------

             Summary: Enable stream-stream self joins 
                 Key: SPARK-23406
                 URL: https://issues.apache.org/jira/browse/SPARK-23406
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 2.3.0
            Reporter: Tathagata Das
            Assignee: Tathagata Das


Currently stream-stream self join throws the following error
{code}
val df = spark.readStream.format("rate").option("numRowsPerSecond", "1").option("numPartitions", "1").load()
display(df.withColumn("key", $"value" / 10).join(df.withColumn("key", $"value" / 5), "key"))
{code}
error:
{code}
Failure when resolving conflicting references in Join:
'Join UsingJoin(Inner,List(key))
:- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 as double)) AS key#855]
: +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
+- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 as double)) AS key#860]
 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]

Conflicting attributes: timestamp#850,value#851L
;;
'Join UsingJoin(Inner,List(key))
:- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 as double)) AS key#855]
: +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
+- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 as double)) AS key#860]
 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]

at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
 at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:101)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:378)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:98)
 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:148)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:98)
 at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:101)
 at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:71)
 at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
 at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3063)
 at org.apache.spark.sql.Dataset.join(Dataset.scala:787)
 at org.apache.spark.sql.Dataset.join(Dataset.scala:756)
 at org.apache.spark.sql.Dataset.join(Dataset.scala:731)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org