You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xingcan Cui (Jira)" <ji...@apache.org> on 2020/05/15 18:02:00 UTC

[jira] [Commented] (SPARK-31724) Improve the output mode for structured streaming

    [ https://issues.apache.org/jira/browse/SPARK-31724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108513#comment-17108513 ] 

Xingcan Cui commented on SPARK-31724:
-------------------------------------

Copy some comments from [https://github.com/apache/spark/pull/28523]
----
xccui commented 23 hours ago

 

Hi all, I am not quite familiar with the version history of the streaming sink, but would like to share pieces of my thought here. Please correct me if I misunderstood.

I think SupportsStreamingUpdate should NOT be a sink specific feature. We concentrate on sink now because the current SS implementation doesn't allow chaining operators (other than the sink) which produce updating results.
 The only reason we provide update mode should be to produce the "correct" result table, i.e., to make the result table identical with the one produced by applying the same query on the materialized input rows.
 The semantics of update mode we provided lacks deleting support, which makes it unreliable sometimes. I suppose we all agree on offering a better design in the future. But for now, it's better to keep it unchanged with previous versions (and also with the lowest risk and efforts).
 I'll try to make some improvements to the related issues.
----
HeartSaVioR commented 17 hours ago

 

Thanks for the great input, @xccui.

Basically I agree with your input - that's the same as my understanding as I commented before (#28523 (comment)).

To summarize my previous comment, I also don't know how the streaming output mode was designed, but from my understanding it's effective only on result table for stateful aggregation operators. It's not even applied for all stateful operators, e.g. the mode doesn't affect stream-stream join. It doesn't guarantee the final output is respecting the semantic, and then there's no meaning of applying the same on the sink side.

Another concern comes into my mind is complete mode. The complete mode is also effective on the result table. It may sound making sense to support complete mode in sink as truncate and insert, but it leads to data loss for the case the result table is being union to other stream which is not creating "result table". (I haven't had such query but it's technically possible.) The complete mode will not care about the other stream and in every batch the previous output from the other stream will be lost. I think complete mode is weird one for streaming and better to discontinue supporting; I wouldn't expect any production query to use this mode, but please let me know if there is.

Anyway I think the streaming update mode technically doesn't couple with the availability of sink. It should be left as it is, though we'll probably have to fix guide doc as the guide doc says it's for result table "as well as" for the sink. Description of the streaming output mode in sink should be corrected as well - they're not dependent on streaming output mode, and as of now only append is possible.

ps. We may need to revisit the operators and streaming output modes to see any flaw, similarly I went through via discussion thread and #24890. One thing would be flatMapGroupsWithState with append mode.
----
 
 xccui commented 12 hours ago

 

@HeartSaVioR Yes. It seems the output mode option was mainly designed for stateful aggregations, which means it actually works in a restricted way.

Ideally, to support complete mode, all the operators must be capable of outputting the "complete" result seen so far for each epoch. Personally, I'm in favor of removing this mode in a future version. But for now, I propose to add more restrictions while doing the plan check (e.g., disallowing the union situation you mentioned) and also a note to the documentation.

IMO, the mode of the result table should only be decided by the operators in the plan and it could either be "append" or "update" (including the current "complete" mode). Basically, the designated sink should match the mode of the result table. Usually, supporting "update" needs more effort and that means only part of the sinks could be chosen for a plan containing an aggregation or some kind of joins.
----
HeartSaVioR commented 5 hours ago


 I'm curious how SS committers think about these comments upon - if they agree about the comments then the issue is rather about the design issue of streaming output mode, and I think the right way to fix is decoupling streaming output mode with sink.

Would it break backward compatibility? If you look back branch-2.4, you'll find it very surprised that most built-in sink implementations "ignore" the output mode. The output mode provided by StreamWriteSupport.createStreamWriter is not used at all, with only one exception.

The only exception for built-in sink is memory sink, and it doesn't deal with the difference between append mode and update mode. It's only used to truncate the memory for complete mode. Given that the sink exists most likely for testing, it only helps to make tests be easier to implement, nothing else. Also I'm strongly in favor of dropping complete mode, as I already provided data loss issue, and I don't think it's production-wise.

I don't even think custom sinks have been respecting the output mode, as the API lacks of information on update mode, and complete mode is not production-wise.

The major problem is the time. I totally understand such change may feel a bit huge to go with the release which has already done in RC1 though...

I hope we address it in the major release (that's a good rationalization), but if we really want to minimize the changes for now, what about adding SupportsStreamingTruncate as internal trait as well, so that we avoid coupling SupportTruncate with complete mode and let streaming write go different path with the batch one?
 (Even better if we simply don't support complete mode or restrict to private right now, but...)

As both SupportsStreamingUpdate and SupportsStreamingTruncate would be internal one, we will have time to revisit the streaming path and change without making effect of public API.

That would make custom sinks only be able to append as we won't expose these abilities, but that's what I expect so far. Even with the new DSv2 implementing truncate in streaming sink looks to be very limited, as truncation should take place when committing, which means write tasks cannot write to the destination directly, not scalable.

Does my proposal make sense?

cc. to @tdas @zsxwing @jose-torres @brkyvz @jerryshao @gaborgsomogyi to hear their voices as well. Please cc. to more ppl if you have anyone to get some help taking a look at.

 

> Improve the output mode for structured streaming
> ------------------------------------------------
>
>                 Key: SPARK-31724
>                 URL: https://issues.apache.org/jira/browse/SPARK-31724
>             Project: Spark
>          Issue Type: Umbrella
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Xingcan Cui
>            Priority: Major
>
> The current design of output mode in structured streaming is restricted and needs some improvements. This umbrella issue is used to track all the updates we are going to make.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org