You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Theo Diefenthal (JIRA)" <ji...@apache.org> on 2019/01/22 17:06:00 UTC

[jira] [Created] (SPARK-26692) Structured Streaming: Aggregation + JOIN not working

Theo Diefenthal created SPARK-26692:
---------------------------------------

             Summary: Structured Streaming: Aggregation + JOIN not working
                 Key: SPARK-26692
                 URL: https://issues.apache.org/jira/browse/SPARK-26692
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.0
            Reporter: Theo Diefenthal


I tried to setup a simple streaming pipeline with two streams on two data sources (CSV files) where one stream is fist windowed (aggregated) and then the streams are joined. As output, I chose console for development in Append Mode.

After multiple hours of setup and testing, I still wasn't able to get a working example running. I also found a StackOverflow topic here [https://stackoverflow.com/questions/52300247/spark-scala-structured-streaming-aggregation-and-self-join] where they have the same findings as I had: "In general, {{append}} output mode with aggregations is not a recommended way. As far as I understand". And "I had the same empty output in my case with different aggregation. Once I changed it to {{update}} mode I got output. I have strong doubts that you will be able to do it in {{append}} mode. My understanding is that {{append}} mode only for {{map-like}} operations, e.g. filter, transform entry etc. I believe that multiphase processing". 

I observed the same and only got empty output in my example:
{code:java}
public static SparkSession buildSession() throws Exception {
    return SparkSession.builder()
            .appName("StreamGroupJoin")
            .config("spark.sql.shuffle.partitions", 4)
            .master("local[2]")
            .getOrCreate();
}

public static Dataset<Row> loadData(SparkSession session, String filepath) {
    return session
            .readStream()
            .format("csv")
            .option("header", true)
            .option("path", filepath)
            .schema(new StructType().add("ts", DataTypes.TimestampType).add("color", DataTypes.StringType).add("data", DataTypes.StringType))
            .load();
}

public static void main(String[] args) throws Exception {

    SparkSession session = buildSession();

    Dataset<Row> shieldStream = loadData(session, "streamingpoc/src/main/resources/simpleSHIELD");
    Dataset<Row> argusStream = loadData(session, "streamingpoc/src/main/resources/simpleARGUS");

    shieldStream = shieldStream.withWatermark("ts", "0 hours");

    argusStream = argusStream.withWatermark("ts", "0 hours");
    argusStream = argusStream.groupBy(window(col("ts"), "24 hours"), col("color")).count();
    argusStream = argusStream.select(col("window.start").as("argusStart"), col("window.end").as("argusEnd"), col("color").as("argusColor"), col("count").as("argusCount"));
    //argusStream = argusStream.withWatermark("argusStart", "0 hours");

    Dataset<Row> joinedStream = argusStream.join(shieldStream, expr("color = argusColor AND ts >= argusStart AND ts <= argusEnd"));
    joinedStream = joinedStream.withWatermark("ts", "0 hours");

    StreamingQuery joinedQuery = joinedStream.writeStream()
            .outputMode(OutputMode.Append())
            .format("console")
            .start();

    joinedQuery.awaitTermination();

    System.out.println("DONE");
}{code}
I'd like to address that at least in my testing version of Spark 2.4.0, it is not even possible to switch to OutputMode.Update due to "_Inner join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode_"

In my example, I used two simple CSV datasets having the same format and one matching row which should be output after the JOIN. 

If I work without JOIN, both streams (aggregated and not) work fine. If I work without aggregation, JOIN works fine. But if I use both (at least in append mode), it doesn't work out. If I don't use Spark Structured Streaming but standard Spark Dataframes, I get the result I also planned to have. 

*Possible Solutions*
 #  Either there is a bug/missusage in my code. In that case, the ticket can be closed and I'd be happy if someone could tell me what I did wrong. I tried quite a lot with different Watermark settings but wasn't able to find a working setup.
 # Perform a fix for OuputMode Append if technically possible (From my theoretical understanding of Big-Data-Streaming in general, this should be possible, but I'm not too much into the topic and I'm not familar with Spark afterall)
 # Make this option unavailable in spark (i.e. print out a pipeline error that an aggregated stream can't be joined in append mode as is already done if I try to join two aggregated streams). In that case, the documentation should also be updated and stated out that for anyone willing to perform aggregations and JOINs, he is advised to put the aggregation output back into a sink like kafka and reread from there for the join.

 

Following is the result I'd like to obtain (And which I get if I use Spark Datasets instead of Spark Structured Streaming)
{code:java}
+-------------------+-------------------+----------+----------+-------------------+-----+--------+
|argusStart         |argusEnd           |argusColor|argusCount|ts                 |color|data    |
+-------------------+-------------------+----------+----------+-------------------+-----+--------+
|2018-07-20 02:00:00|2018-07-21 02:00:00|red       |1         |2018-07-20 12:01:15|red  |Iron Man|
+-------------------+-------------------+----------+----------+-------------------+-----+--------+{code}
And the dummy CSV files I created

{{example-argus.csv}}
{code:java}
ts,color,data
2018-07-19T08:33:07Z,green,Green Lantern
2018-07-20T00:00:00Z,red,Aquaman
2018-07-20T07:00:00Z,green,Batman
2018-07-20T10:00:00Z,green,Flash
2018-07-21T10:01:13Z,green,Green Arrow
2018-07-22T10:01:15Z,green,Robin
2018-07-23T10:03:15Z,green,Starfire
2018-07-26T10:07:23Z,green,Supergirl
2018-07-26T10:13:23Z,red,Superman
2018-07-26T11:01:01Z,green,Wonder Woman
2018-07-26T14:02:11Z,green,Cyborg
2018-07-28T14:05:53Z,green,Harley Quinn
2018-07-30T05:00:13Z,green,Deadshot
2018-08-10T09:23:32Z,green,El Diablo
2018-08-10T12:12:12Z,green,Killer Croc{code}
{{example-shield.csv}}
{code:java}
ts,color,data
2018-07-19T10:01:13Z,blue,Captain America
2018-07-20T10:01:15Z,red,Iron Man
2018-07-20T10:03:15Z,blue,Thor
2018-07-20T10:07:23Z,blue,Hulk
2018-07-20T10:13:23Z,blue,Black Widow
2018-07-20T11:01:01Z,blue,Hawkeye
2018-07-20T14:02:11Z,blue,Loki
2018-07-20T14:05:53Z,blue,Spider-Man
2018-07-21T05:00:13Z,blue,Vision
2018-07-21T09:23:32Z,blue,Scarlet Witch
2018-07-21T12:12:12Z,blue,Dr. Strange
2018-07-21T13:13:13Z,blue,Star-Lord
2018-07-22T01:52:18Z,red,Drax
2018-07-26T01:52:18Z,blue,Groot
2018-08-10T12:12:12Z,blue,Rocket{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org