You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Xu Yan <xy...@thoughtworks.com> on 2021/02/27 06:54:37 UTC

Spark 2.3 Stream-Stream Join with left outer join lost left stream value

I'm trying to implement a stream-stream join toy with Spark 2.3.0

The stream joins work fine when the condition matches, but lost the left stream value when the condition mismatched even using leftOuterJoin.

Thanks in advance

Here are my source code and data, basically, I'm creating two sockets, one is 9999 as right stream source and 9998 as left stream source.

val spark = SparkSession
      .builder
      .appName("StreamStream")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("ERROR")

    val s9999: DataFrame = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    val s9999Dataset: Dataset[S9999] = s9999
      .map(line => {
        val strings = line.get(0).toString.split(",")
        val id = strings(0).toInt
        val time = Timestamp.valueOf(strings(1))
        S9999(id, time)
      })
      .withWatermark("timestamp99", "30 seconds")

    val s9998Dataset: Dataset[S9998] = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9998)
      .load()
      .map(line => {
        val strings = line.get(0).toString.split(",")
        val id = strings(0).toInt
        val time = Timestamp.valueOf(strings(1))
        S9998(id, time)
      })

    val resultDataset = s9998Dataset
      .join(s9999Dataset,
        joinExprs = expr(
          """
                id99 = id98 AND
                timestamp98 >= timestamp99 AND
                timestamp98 <= timestamp99 + interval 6 seconds
        """),
        joinType = "leftOuter")

    val streamingQuery = resultDataset
      .writeStream
      .outputMode("append")
      .format("console")
      .start()

    streamingQuery.awaitTermination()
  }

  case class S9999(id99: Int, timestamp99: Timestamp)

  case class S9998(id98: Int, timestamp98: Timestamp)


Sample Data:
1,2011-10-02 18:50:20.123
2,2011-10-02 18:50:25.123
3,2011-10-02 18:50:30.123
4,2011-10-02 18:50:35.123
5,2011-10-02 18:50:40.123
6,2011-10-02 18:50:45.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
10,2011-10-02 18:51:05.123
11,2011-10-02 18:51:10.123
12,2011-10-02 18:51:15.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123
16,2011-10-02 18:52:30.123



Re: Spark 2.3 Stream-Stream Join with left outer join lost left stream value

Posted by Jungtaek Lim <ka...@gmail.com>.
We figured out edge-case from stream-stream left/right outer join in Spark
2.x and fixed in Spark 3.0.0. Please refer SPARK-26154
<https://issues.apache.org/jira/browse/SPARK-26154> for more details.
The fix brought another regression which was fixed in 3.0.1, so you may
want to move to Spark 3.0.1+ to fix the issue.

The state format was changed so the fix is applied only when you start from
scratch (no restore from checkpoint). Unfortunately there's no way to
migrate the old state format to the new state format.

Hope this helps.

On Sat, Feb 27, 2021 at 10:24 PM Xu Yan <xy...@thoughtworks.com> wrote:

> I'm trying to implement a stream-stream join toy with Spark 2.3.0
>
> The stream joins work fine when the condition matches, but lost the left
> stream value when the condition mismatched even using leftOuterJoin.
>
> Thanks in advance
>
> Here are my source code and data, basically, I'm creating two sockets, one
> is 9999 as right stream source and 9998 as left stream source.
>
> val spark = SparkSession
>       .builder
>       .appName("StreamStream")
>       .master("local")
>       .getOrCreate()
>
>     import spark.implicits._
>
>     spark.sparkContext.setLogLevel("ERROR")
>
>     val s9999: DataFrame = spark
>       .readStream
>       .format("socket")
>       .option("host", "localhost")
>       .option("port", 9999)
>       .load()
>
>     val s9999Dataset: Dataset[S9999] = s9999
>       .map(line => {
>         val strings = line.get(0).toString.split(",")
>         val id = strings(0).toInt
>         val time = Timestamp.valueOf(strings(1))
>         S9999(id, time)
>       })
>       .withWatermark("timestamp99", "30 seconds")
>
>     val s9998Dataset: Dataset[S9998] = spark
>       .readStream
>       .format("socket")
>       .option("host", "localhost")
>       .option("port", 9998)
>       .load()
>       .map(line => {
>         val strings = line.get(0).toString.split(",")
>         val id = strings(0).toInt
>         val time = Timestamp.valueOf(strings(1))
>         S9998(id, time)
>       })
>
>     val resultDataset = s9998Dataset
>       .join(s9999Dataset,
>         joinExprs = expr(
>           """
>                 id99 = id98 AND
>                 timestamp98 >= timestamp99 AND
>                 timestamp98 <= timestamp99 + interval 6 seconds
>         """),
>         joinType = "leftOuter")
>
>     val streamingQuery = resultDataset
>       .writeStream
>       .outputMode("append")
>       .format("console")
>       .start()
>
>     streamingQuery.awaitTermination()
>   }
>
>   case class S9999(id99: Int, timestamp99: Timestamp)
>
>   case class S9998(id98: Int, timestamp98: Timestamp)
>
>
>
> Sample Data:
> 1,2011-10-02 18:50:20.123
> 2,2011-10-02 18:50:25.123
> 3,2011-10-02 18:50:30.123
> 4,2011-10-02 18:50:35.123
> 5,2011-10-02 18:50:40.123
> 6,2011-10-02 18:50:45.123
> 7,2011-10-02 18:50:50.123
> 8,2011-10-02 18:50:55.123
> 9,2011-10-02 18:51:00.123
> 10,2011-10-02 18:51:05.123
> 11,2011-10-02 18:51:10.123
> 12,2011-10-02 18:51:15.123
> 13,2011-10-02 18:51:20.123
> 14,2011-10-02 18:51:25.123
> 15,2011-10-02 18:51:30.123
> 16,2011-10-02 18:52:30.123
>
>
>