You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "sandeep katta (JIRA)" <ji...@apache.org> on 2018/10/25 14:15:00 UTC
[jira] [Commented] (SPARK-25834) stream stream Outer join with
update mode is not throwing exception
[ https://issues.apache.org/jira/browse/SPARK-25834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663797#comment-16663797 ]
sandeep katta commented on SPARK-25834:
---------------------------------------
[~sachin1729] thanks for reporting this, soon will raise PR for this
> stream stream Outer join with update mode is not throwing exception
> -------------------------------------------------------------------
>
> Key: SPARK-25834
> URL: https://issues.apache.org/jira/browse/SPARK-25834
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.1, 2.3.2
> Reporter: Sachin Ramachandra Setty
> Priority: Minor
>
> Execute the below program and can see there is no AnalysisException thrown
> import java.sql.Timestamp
> import org.apache.spark.sql.functions.\{col, expr}
> import org.apache.spark.sql.streaming.Trigger
>
> val lines_stream1 = spark.readStream.
> format("kafka").
> option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
> option("subscribe", "test11").
> option("includeTimestamp", true).
> load().
> selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
> select(col("value") as("data"),col("timestamp") as("recordTime")).
> select("data","recordTime").
> withWatermark("recordTime", "20 seconds ")
> val lines_stream2 = spark.readStream.
> format("kafka").
> option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
> option("subscribe", "test22").
> option("includeTimestamp", value = true).
> load().
> selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
> select(col("value") as("data1"),col("timestamp") as("recordTime1")).
> select("data1","recordTime1").
> withWatermark("recordTime1", "20 seconds ")
> val query = lines_stream1.join(lines_stream2, expr (
> """
> | data == data1 and
> | recordTime1 >= recordTime and
> | recordTime1 <= recordTime + interval 20 seconds
> """.stripMargin),"*left*").
> writeStream.
> option("truncate","false").
> outputMode("update").
> format("console").
> trigger(Trigger.ProcessingTime ("2 second")).
> start()
> query.awaitTermination()
> As per the document https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins
> joins are only supported in append mode
> *As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.*
> Inner join is working as per spark documentation but it is failed for outer joins
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org