You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Theo Diefenthal (JIRA)" <ji...@apache.org> on 2019/01/22 17:55:00 UTC

[jira] [Resolved] (SPARK-26692) Structured Streaming: Aggregation + JOIN not working

     [ https://issues.apache.org/jira/browse/SPARK-26692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Theo Diefenthal resolved SPARK-26692.
-------------------------------------
    Resolution: Invalid

Just read the crucial part of the doc again
{code:java}
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-querie{code}
{code:java}
As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

- Cannot use streaming aggregations before joins.

- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.{code}
It would still be nice if Spark would raise an exception instead of just delivering empty results....

> Structured Streaming: Aggregation + JOIN not working
> ----------------------------------------------------
>
>                 Key: SPARK-26692
>                 URL: https://issues.apache.org/jira/browse/SPARK-26692
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Theo Diefenthal
>            Priority: Major
>
> I tried to setup a simple streaming pipeline with two streams on two data sources (CSV files) where one stream is fist windowed (aggregated) and then the streams are joined. As output, I chose console for development in Append Mode.
> After multiple hours of setup and testing, I still wasn't able to get a working example running. I also found a StackOverflow topic here [https://stackoverflow.com/questions/52300247/spark-scala-structured-streaming-aggregation-and-self-join] where they have the same findings as I had: "In general, {{append}} output mode with aggregations is not a recommended way. As far as I understand". And "I had the same empty output in my case with different aggregation. Once I changed it to {{update}} mode I got output. I have strong doubts that you will be able to do it in {{append}} mode. My understanding is that {{append}} mode only for {{map-like}} operations, e.g. filter, transform entry etc. I believe that multiphase processing". 
> I observed the same and only got empty output in my example:
> {code:java}
> public static SparkSession buildSession() throws Exception {
>     return SparkSession.builder()
>             .appName("StreamGroupJoin")
>             .config("spark.sql.shuffle.partitions", 4)
>             .master("local[2]")
>             .getOrCreate();
> }
> public static Dataset<Row> loadData(SparkSession session, String filepath) {
>     return session
>             .readStream()
>             .format("csv")
>             .option("header", true)
>             .option("path", filepath)
>             .schema(new StructType().add("ts", DataTypes.TimestampType).add("color", DataTypes.StringType).add("data", DataTypes.StringType))
>             .load();
> }
> public static void main(String[] args) throws Exception {
>     SparkSession session = buildSession();
>     Dataset<Row> shieldStream = loadData(session, "streamingpoc/src/main/resources/simpleSHIELD");
>     Dataset<Row> argusStream = loadData(session, "streamingpoc/src/main/resources/simpleARGUS");
>     shieldStream = shieldStream.withWatermark("ts", "0 hours");
>     argusStream = argusStream.withWatermark("ts", "0 hours");
>     argusStream = argusStream.groupBy(window(col("ts"), "24 hours"), col("color")).count();
>     argusStream = argusStream.select(col("window.start").as("argusStart"), col("window.end").as("argusEnd"), col("color").as("argusColor"), col("count").as("argusCount"));
>     //argusStream = argusStream.withWatermark("argusStart", "0 hours");
>     Dataset<Row> joinedStream = argusStream.join(shieldStream, expr("color = argusColor AND ts >= argusStart AND ts <= argusEnd"));
>     joinedStream = joinedStream.withWatermark("ts", "0 hours");
>     StreamingQuery joinedQuery = joinedStream.writeStream()
>             .outputMode(OutputMode.Append())
>             .format("console")
>             .start();
>     joinedQuery.awaitTermination();
>     System.out.println("DONE");
> }{code}
> I'd like to address that at least in my testing version of Spark 2.4.0, it is not even possible to switch to OutputMode.Update due to "_Inner join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode_"
> In my example, I used two simple CSV datasets having the same format and one matching row which should be output after the JOIN. 
> If I work without JOIN, both streams (aggregated and not) work fine. If I work without aggregation, JOIN works fine. But if I use both (at least in append mode), it doesn't work out. If I don't use Spark Structured Streaming but standard Spark Dataframes, I get the result I also planned to have. 
> *Possible Solutions*
>  #  Either there is a bug/missusage in my code. In that case, the ticket can be closed and I'd be happy if someone could tell me what I did wrong. I tried quite a lot with different Watermark settings but wasn't able to find a working setup.
>  # Perform a fix for OuputMode Append if technically possible (From my theoretical understanding of Big-Data-Streaming in general, this should be possible, but I'm not too much into the topic and I'm not familar with Spark afterall)
>  # Make this option unavailable in spark (i.e. print out a pipeline error that an aggregated stream can't be joined in append mode as is already done if I try to join two aggregated streams). In that case, the documentation should also be updated and stated out that for anyone willing to perform aggregations and JOINs, he is advised to put the aggregation output back into a sink like kafka and reread from there for the join.
>  
> Following is the result I'd like to obtain (And which I get if I use Spark Datasets instead of Spark Structured Streaming)
> {code:java}
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+
> |argusStart         |argusEnd           |argusColor|argusCount|ts                 |color|data    |
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+
> |2018-07-20 02:00:00|2018-07-21 02:00:00|red       |1         |2018-07-20 12:01:15|red  |Iron Man|
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+{code}
> And the dummy CSV files I created
> {{example-argus.csv}}
> {code:java}
> ts,color,data
> 2018-07-19T08:33:07Z,green,Green Lantern
> 2018-07-20T00:00:00Z,red,Aquaman
> 2018-07-20T07:00:00Z,green,Batman
> 2018-07-20T10:00:00Z,green,Flash
> 2018-07-21T10:01:13Z,green,Green Arrow
> 2018-07-22T10:01:15Z,green,Robin
> 2018-07-23T10:03:15Z,green,Starfire
> 2018-07-26T10:07:23Z,green,Supergirl
> 2018-07-26T10:13:23Z,red,Superman
> 2018-07-26T11:01:01Z,green,Wonder Woman
> 2018-07-26T14:02:11Z,green,Cyborg
> 2018-07-28T14:05:53Z,green,Harley Quinn
> 2018-07-30T05:00:13Z,green,Deadshot
> 2018-08-10T09:23:32Z,green,El Diablo
> 2018-08-10T12:12:12Z,green,Killer Croc{code}
> {{example-shield.csv}}
> {code:java}
> ts,color,data
> 2018-07-19T10:01:13Z,blue,Captain America
> 2018-07-20T10:01:15Z,red,Iron Man
> 2018-07-20T10:03:15Z,blue,Thor
> 2018-07-20T10:07:23Z,blue,Hulk
> 2018-07-20T10:13:23Z,blue,Black Widow
> 2018-07-20T11:01:01Z,blue,Hawkeye
> 2018-07-20T14:02:11Z,blue,Loki
> 2018-07-20T14:05:53Z,blue,Spider-Man
> 2018-07-21T05:00:13Z,blue,Vision
> 2018-07-21T09:23:32Z,blue,Scarlet Witch
> 2018-07-21T12:12:12Z,blue,Dr. Strange
> 2018-07-21T13:13:13Z,blue,Star-Lord
> 2018-07-22T01:52:18Z,red,Drax
> 2018-07-26T01:52:18Z,blue,Groot
> 2018-08-10T12:12:12Z,blue,Rocket{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org