You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jungtaek Lim <ka...@gmail.com> on 2019/07/16 05:11:24 UTC

My curation of pending structured streaming PRs to review

Hi devs,

As we make progress on some minor PRs on structured streaming, I'd like to
remind about major PRs on SS area to get more chances to be reviewed.

Please note that I only include existing PRs, so something still not
discussed like queryable state is not included in the curation list. Also,
I've excluded PRs on continuous processing, as I'm not fully sure about
current direction and vision on this feature. Minor PRs are mostly excluded
unless they are proposed for a long ago. Last, I could be biased on
curating list.

Let's get started!

----
A. File Source/Sink

1. [SPARK-20568][SS] Provide option to clean up completed files in
streaming query

ISSUE: https://issues.apache.org/jira/browse/SPARK-20568
PR: https://github.com/apache/spark/pull/22952

From the nature of "stream", the input data will grow infinitely and end
users want to have a clear way to clean up completed files. Unlike batch
query, structured streaming doesn't require all input files to be presented
- once they've been committed (say, completed processing), they wouldn't be
read from such query.

This patch automatically cleans up input files when they're committed, with
three options: 1) keep it as it is, 2) archive (move) to other directory 3)
delete.

2. [SPARK-27188][SS] FileStreamSink: provide a new option to have retention
on output files

ISSUE: https://issues.apache.org/jira/browse/SPARK-27188
PR: https://github.com/apache/spark/pull/24128

File sink writes metadata which records list of output files to ensure file
source to only read correct files, which helps to achieve end-to-end
exactly once. But file sink has no idea when output files will not be
accessed from downstream query, so metadata just grows infinitely and
output files cannot be removed safely.

This patch opens the chance for end users to provide TTL on output files so
that metadata will eventually exclude expired output files as well as end
users could remove the output files safely.


B. Kafka Source/Sink

1. [SPARK-21869][SS] A cached Kafka producer should not be closed if any
task is using it - adds inuse tracking.

ISSUE: https://issues.apache.org/jira/browse/SPARK-21869
PR: https://github.com/apache/spark/pull/19096

This is a long-lasting bug (around 2 years after filing the JIRA issue): if
some task uses cached Kafka producer longer than 10 minutes, pool will
recognize it as "timed-out" and just close it. After closing undefined
behavior from task side will occur.

This patch adds "in-use" tracking on producer to address this. Please note
that Kafka producer is thread-safe (whereas Kafka consumer is not) and we
allow using it concurrently, so we can't adopt commons pool to pool
producer. (Though we can still leverage commons pool if we are OK to not
share between threads.)

2. [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

ISSUE: https://issues.apache.org/jira/browse/SPARK-23539
PR: https://github.com/apache/spark/pull/22282

As there's great doc to rationalize the needs on supporting Kafka headers,
I'll just let the doc explaining it.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

Please note that the issue has been commented from end users regarding
availability, which also represents the needs on end users' side.

3. [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer

ISSUE: https://issues.apache.org/jira/browse/SPARK-25151
PR: https://github.com/apache/spark/pull/22138

Kafka source has its pooling logic for consumers, but as I saw some JIRA
issues regarding pooling we seem to agree we would like to replace with
known pool implementation which provides advanced configuration, detailed
metrics, etc.

This patch adopts Apache Commons Pool (which above advantages are brought)
to be used as a connection pool for consumers, with respecting to current
behavior whenever possible. It also separates pooling for consumer and
fetched data which enables to maximize efficiency on pooling consumers, and
also address the bug on unnecessary re-fetch on self-join. (The result of
experiment is in PR's content.)

4. [SPARK-26848][SQL] Introduce new option to Kafka source: offset by
timestamp (starting/ending) SQL

ISSUE: https://issues.apache.org/jira/browse/SPARK-26848
PR: https://github.com/apache/spark/pull/23747

When end users would want to replay their records in Kafka topic, they
wouldn't memorize exact offsets per each partition but Spark requires to do
that, otherwise just start from earliest. We as human being are much
familiar with time, once we want to replay some records we know the
timestamp of records we should start from.

This patch opens the chance for end users to provide offset by timestamp
(either starting or ending, or both) which will be transparently passed on
Kafka when requesting.


C. State

1. [SPARK-27237][SS] Introduce State schema validation among query restart

ISSUE: https://issues.apache.org/jira/browse/SPARK-27237
PR: https://github.com/apache/spark/pull/24173

Spark doesn't have explicit mechanism to avoid end users to change their
query as "non-compatible". We documented the rules where the query will not
be compatible between changes, but it's not easier to self-determine the
rules, and non-friendly error message will be thrown if end users violate
the rule. In fact, undefined behavior will occur.

This patch introduces state schema validation, which verifies schema
compatibility regarding states between changes of query, and provides
informative error message on end users so that they indicate previous
schema and current schema of state.

This is also a baseline of new data source - state, as we can leverage
state schema information and not requiring end users to input the schema.

2. [SPARK-28191][SS] New data source - state - reader part

ISSUE: https://issues.apache.org/jira/browse/SPARK-28191
PR: https://github.com/apache/spark/pull/24990

Please read below JIRA issue to see rationalization of state data source,
as the issue description contains the cases where state data source can be
used. (e.g. schema evolution on state, offline rescale on state, etc.)
https://issues.apache.org/jira/browse/SPARK-28190

This patch deals with source part - enables reading states on structured
streaming query to the batch query.

3. [SPARK-28120][SS] Rocksdb state storage implementation

ISSUE: https://issues.apache.org/jira/browse/SPARK-28120
PR: https://github.com/apache/spark/pull/24922

The memory has been huge limitation of state size. As structured streaming
loads two versions of state in executor by default, memory pressure becomes
the real problem on dealing with large state. Scaling up executors may
work, but it requires unnecessary waste of resource, and it can't help when
executor is beyond number of partitions. (State data source will eventually
help on repartitioning but it requires offline batch query.)

State store which resides outside of memory is mandatory to structured
streaming for dealing with large state, and this patch is trying to address
it by introducing RocksDB state store provider.


D. Structured Streaming

1. [SPARK-24634][SS] Add a new metric regarding number of rows later than
watermark

ISSUE: https://issues.apache.org/jira/browse/SPARK-24634
PR: https://github.com/apache/spark/pull/24936

Spark does't provide any information on late rows which could be dropped on
stateful processor.

This patch adds metrics on counting late rows so that end users can be
noticed about it. Please note that the issue was originally meant to
provide the number of dropped rows due the late, but Spark does
pre-aggregation on streaming aggregation, so it doesn't provide correct
number. Current approach is less informative than origin intention but
still bring the value, for example, determining whether the query is
affected by SPARK-28074.

2. [SPARK-26154][SS] Streaming left/right outer join should not return
outer nulls for already matched rows

ISSUE: https://issues.apache.org/jira/browse/SPARK-26154
PR: https://github.com/apache/spark/pull/23634

This is long-standing correctness issue, and multiple end users (including
me) reported about the behavior. This is occurred on edge-case, but the
edge-case is not hard to reproduce, even closer to example query we provide
as streaming outer join.

This patch addresses the correctness issue via changing the state on join -
introduced "matched" flag.

3. [SPARK-26655][SS] Support multiple aggregates in append mode

ISSUE: https://issues.apache.org/jira/browse/SPARK-26655
PR: https://github.com/apache/spark/pull/23576

Multiple streaming aggregates has been concerned by end users - in
perspective of end users, it sounds like just an essential thing to
support, but Spark doesn't support this. There're many SO questions as well
as mail threads asking this feature, but we still didn't deal with it.

If we only think about append mode, technically the feature is bound to
proper definition of watermark. We haven't considered watermark calculation
(and/or propagation) for multiple stages of stateful operations, but as
there's widely used concept on multiple stages of watermark, we can
leverage it and focus how to apply it to Spark.
(For update mode, retraction is needed which would require huge efforts on
adopting, so let's ignore for now.)

Please keep in mind, lack of definition of watermark on multiple stateful
stages is not only the problem of multiple streaming aggregations, but also
multiple stateful operations (including streaming join,
flatMapGroupsWithState, deduplicate, etc) which is not technically
restricted by Spark. SPARK-28074 points out this problem.

This patch tries to address multiple aggregates - the patch itself may not
be valid, but there's a design doc we can move forward and update the
implementation.

4. [SPARK-27330][SS] support task abort in foreach writer

ISSUE: https://issues.apache.org/jira/browse/SPARK-27330
PR: https://github.com/apache/spark/pull/24382

Foreach writer could leak resource when task is aborted, as Spark does't
call writer.close() when task is aborted. If the task throws exception in
process in foreach writer or succeeds to commit, it would properly call
close(), but in other case calling close() is missing due to missing proper
handle about abort.

This patch fixes the bug.

5. [SPARK-28074][DOC][SS] Document caveats on using multiple stateful
operations in single query

ISSUE: https://issues.apache.org/jira/browse/SPARK-28074
PR: https://github.com/apache/spark/pull/24890

As I mentioned in SPARK-26655, Spark doesn't restrict using multiple
stateful operations in single query (except streaming aggregations), where
the concept of watermark is not covered properly on multiple stateful
stages.

I've explained this issue with example on dev mailing list earlier, so you
can refer the link to see rationalization of issue.
https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E

We've not decided how to let end users avoid the issue (dealing with
SPARK-26655 is the best but in the meanwhile...) and this patch is trying
to establish (or discuss) how to guide end users.

SPARK-24634 would be help to end users to determine whether their query is
affected by this issue, as in append mode intermediate output should not be
later than watermark.
----

Please chime in and share your curation if I'm missing something.

Thanks,
Jungtaek Lim (HeartSaVioR)

Re: My curation of pending structured streaming PRs to review

Posted by Jungtaek Lim <ka...@gmail.com>.
As a reminder, the list contains two correctness bugs: stream-stream outer
join, and multiple stateful operations with watermark.

Regarding common theme, yes that's somewhat I'd rather avoid to say, but
honestly I feel there's shortage on active committers on 'structured
streaming'.

Many of them I know as relevant to SS area didn't show up themselves in
Spark community for around half a year (maybe even more), and unfortunately
even active committers seem to have struggled with shortage of time doing
their own works (that's natural) and haven't found time to focus reviewing
other PRs (provide valuable comments but not leading PRs as shepherd to be
merged). I hoped that's temporary issue for some important events like
Spark+AI summit, and turned out it's not.

Spark has no replacement of SS, DStream is now even cared less than SS.
Does Spark community not feeling important from streaming area? I might not
agree, as there're reports from end users and patches proposed so far from
contributors. I wouldn't the right one to say how can solve the issue, but
I hope we would handle the main issue nicely and less painful way.


On Tue, Aug 13, 2019 at 10:42 PM Sean Owen <sr...@gmail.com> wrote:

> General tips:
>
> - dev@ is not usually the right place to discuss _specific_ changes
> except once in a while to call attention
> - Ping the authors of the code being changed directly
> - Tighten the change if possible
> - Tests, reproductions, docs, etc help prove the change
> - Bugs are more important than new marginal features
>
> If there has been some feedback that's just skeptical about the
> approach or value, that may be the answer, it won't be merged.
> If there is no feedback and it seems important (correctness bugs) it's
> OK to raise that here once in a while.
>
> One common theme here is 'structured streaming' -- who amongst the
> committers feels they are able to review these changes? I sense we
> have a shortage there.
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: My curation of pending structured streaming PRs to review

Posted by Sean Owen <sr...@gmail.com>.
General tips:

- dev@ is not usually the right place to discuss _specific_ changes
except once in a while to call attention
- Ping the authors of the code being changed directly
- Tighten the change if possible
- Tests, reproductions, docs, etc help prove the change
- Bugs are more important than new marginal features

If there has been some feedback that's just skeptical about the
approach or value, that may be the answer, it won't be merged.
If there is no feedback and it seems important (correctness bugs) it's
OK to raise that here once in a while.

One common theme here is 'structured streaming' -- who amongst the
committers feels they are able to review these changes? I sense we
have a shortage there.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: My curation of pending structured streaming PRs to review

Posted by vikram agrawal <vi...@gmail.com>.
Thanks, Jungtaek for curating this list. It covers a lot of important fixes
and performance improvements in structured streaming.

Hi Devs

What is missing from process perspective from getting these PRs merged?
Apart from this list, is there any other forum where we can request
attention to such important PRs. Is the lack of reviews limited to
Structured streaming or are there other areas of spark which are suffering
from similar neglect? Does the community feel that we need a better
turnaround for PRs to make sure that we don't miss out on important
contributions and encourage newbies like me?

Thanks
Vikram

On Tue, Jul 16, 2019 at 10:41 AM Jungtaek Lim <ka...@gmail.com> wrote:

> Hi devs,
>
> As we make progress on some minor PRs on structured streaming, I'd like to
> remind about major PRs on SS area to get more chances to be reviewed.
>
> Please note that I only include existing PRs, so something still not
> discussed like queryable state is not included in the curation list. Also,
> I've excluded PRs on continuous processing, as I'm not fully sure about
> current direction and vision on this feature. Minor PRs are mostly excluded
> unless they are proposed for a long ago. Last, I could be biased on
> curating list.
>
> Let's get started!
>
> ----
> A. File Source/Sink
>
> 1. [SPARK-20568][SS] Provide option to clean up completed files in
> streaming query
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-20568
> PR: https://github.com/apache/spark/pull/22952
>
> From the nature of "stream", the input data will grow infinitely and end
> users want to have a clear way to clean up completed files. Unlike batch
> query, structured streaming doesn't require all input files to be presented
> - once they've been committed (say, completed processing), they wouldn't be
> read from such query.
>
> This patch automatically cleans up input files when they're committed,
> with three options: 1) keep it as it is, 2) archive (move) to other
> directory 3) delete.
>
> 2. [SPARK-27188][SS] FileStreamSink: provide a new option to have
> retention on output files
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-27188
> PR: https://github.com/apache/spark/pull/24128
>
> File sink writes metadata which records list of output files to ensure
> file source to only read correct files, which helps to achieve end-to-end
> exactly once. But file sink has no idea when output files will not be
> accessed from downstream query, so metadata just grows infinitely and
> output files cannot be removed safely.
>
> This patch opens the chance for end users to provide TTL on output files
> so that metadata will eventually exclude expired output files as well as
> end users could remove the output files safely.
>
>
> B. Kafka Source/Sink
>
> 1. [SPARK-21869][SS] A cached Kafka producer should not be closed if any
> task is using it - adds inuse tracking.
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-21869
> PR: https://github.com/apache/spark/pull/19096
>
> This is a long-lasting bug (around 2 years after filing the JIRA issue):
> if some task uses cached Kafka producer longer than 10 minutes, pool will
> recognize it as "timed-out" and just close it. After closing undefined
> behavior from task side will occur.
>
> This patch adds "in-use" tracking on producer to address this. Please note
> that Kafka producer is thread-safe (whereas Kafka consumer is not) and we
> allow using it concurrently, so we can't adopt commons pool to pool
> producer. (Though we can still leverage commons pool if we are OK to not
> share between threads.)
>
> 2. [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-23539
> PR: https://github.com/apache/spark/pull/22282
>
> As there's great doc to rationalize the needs on supporting Kafka headers,
> I'll just let the doc explaining it.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
>
> Please note that the issue has been commented from end users regarding
> availability, which also represents the needs on end users' side.
>
> 3. [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-25151
> PR: https://github.com/apache/spark/pull/22138
>
> Kafka source has its pooling logic for consumers, but as I saw some JIRA
> issues regarding pooling we seem to agree we would like to replace with
> known pool implementation which provides advanced configuration, detailed
> metrics, etc.
>
> This patch adopts Apache Commons Pool (which above advantages are brought)
> to be used as a connection pool for consumers, with respecting to current
> behavior whenever possible. It also separates pooling for consumer and
> fetched data which enables to maximize efficiency on pooling consumers, and
> also address the bug on unnecessary re-fetch on self-join. (The result of
> experiment is in PR's content.)
>
> 4. [SPARK-26848][SQL] Introduce new option to Kafka source: offset by
> timestamp (starting/ending) SQL
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-26848
> PR: https://github.com/apache/spark/pull/23747
>
> When end users would want to replay their records in Kafka topic, they
> wouldn't memorize exact offsets per each partition but Spark requires to do
> that, otherwise just start from earliest. We as human being are much
> familiar with time, once we want to replay some records we know the
> timestamp of records we should start from.
>
> This patch opens the chance for end users to provide offset by timestamp
> (either starting or ending, or both) which will be transparently passed on
> Kafka when requesting.
>
>
> C. State
>
> 1. [SPARK-27237][SS] Introduce State schema validation among query restart
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-27237
> PR: https://github.com/apache/spark/pull/24173
>
> Spark doesn't have explicit mechanism to avoid end users to change their
> query as "non-compatible". We documented the rules where the query will not
> be compatible between changes, but it's not easier to self-determine the
> rules, and non-friendly error message will be thrown if end users violate
> the rule. In fact, undefined behavior will occur.
>
> This patch introduces state schema validation, which verifies schema
> compatibility regarding states between changes of query, and provides
> informative error message on end users so that they indicate previous
> schema and current schema of state.
>
> This is also a baseline of new data source - state, as we can leverage
> state schema information and not requiring end users to input the schema.
>
> 2. [SPARK-28191][SS] New data source - state - reader part
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-28191
> PR: https://github.com/apache/spark/pull/24990
>
> Please read below JIRA issue to see rationalization of state data source,
> as the issue description contains the cases where state data source can be
> used. (e.g. schema evolution on state, offline rescale on state, etc.)
> https://issues.apache.org/jira/browse/SPARK-28190
>
> This patch deals with source part - enables reading states on structured
> streaming query to the batch query.
>
> 3. [SPARK-28120][SS] Rocksdb state storage implementation
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-28120
> PR: https://github.com/apache/spark/pull/24922
>
> The memory has been huge limitation of state size. As structured streaming
> loads two versions of state in executor by default, memory pressure becomes
> the real problem on dealing with large state. Scaling up executors may
> work, but it requires unnecessary waste of resource, and it can't help when
> executor is beyond number of partitions. (State data source will eventually
> help on repartitioning but it requires offline batch query.)
>
> State store which resides outside of memory is mandatory to structured
> streaming for dealing with large state, and this patch is trying to address
> it by introducing RocksDB state store provider.
>
>
> D. Structured Streaming
>
> 1. [SPARK-24634][SS] Add a new metric regarding number of rows later than
> watermark
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-24634
> PR: https://github.com/apache/spark/pull/24936
>
> Spark does't provide any information on late rows which could be dropped
> on stateful processor.
>
> This patch adds metrics on counting late rows so that end users can be
> noticed about it. Please note that the issue was originally meant to
> provide the number of dropped rows due the late, but Spark does
> pre-aggregation on streaming aggregation, so it doesn't provide correct
> number. Current approach is less informative than origin intention but
> still bring the value, for example, determining whether the query is
> affected by SPARK-28074.
>
> 2. [SPARK-26154][SS] Streaming left/right outer join should not return
> outer nulls for already matched rows
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-26154
> PR: https://github.com/apache/spark/pull/23634
>
> This is long-standing correctness issue, and multiple end users (including
> me) reported about the behavior. This is occurred on edge-case, but the
> edge-case is not hard to reproduce, even closer to example query we provide
> as streaming outer join.
>
> This patch addresses the correctness issue via changing the state on join
> - introduced "matched" flag.
>
> 3. [SPARK-26655][SS] Support multiple aggregates in append mode
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-26655
> PR: https://github.com/apache/spark/pull/23576
>
> Multiple streaming aggregates has been concerned by end users - in
> perspective of end users, it sounds like just an essential thing to
> support, but Spark doesn't support this. There're many SO questions as well
> as mail threads asking this feature, but we still didn't deal with it.
>
> If we only think about append mode, technically the feature is bound to
> proper definition of watermark. We haven't considered watermark calculation
> (and/or propagation) for multiple stages of stateful operations, but as
> there's widely used concept on multiple stages of watermark, we can
> leverage it and focus how to apply it to Spark.
> (For update mode, retraction is needed which would require huge efforts on
> adopting, so let's ignore for now.)
>
> Please keep in mind, lack of definition of watermark on multiple stateful
> stages is not only the problem of multiple streaming aggregations, but also
> multiple stateful operations (including streaming join,
> flatMapGroupsWithState, deduplicate, etc) which is not technically
> restricted by Spark. SPARK-28074 points out this problem.
>
> This patch tries to address multiple aggregates - the patch itself may not
> be valid, but there's a design doc we can move forward and update the
> implementation.
>
> 4. [SPARK-27330][SS] support task abort in foreach writer
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-27330
> PR: https://github.com/apache/spark/pull/24382
>
> Foreach writer could leak resource when task is aborted, as Spark does't
> call writer.close() when task is aborted. If the task throws exception in
> process in foreach writer or succeeds to commit, it would properly call
> close(), but in other case calling close() is missing due to missing proper
> handle about abort.
>
> This patch fixes the bug.
>
> 5. [SPARK-28074][DOC][SS] Document caveats on using multiple stateful
> operations in single query
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-28074
> PR: https://github.com/apache/spark/pull/24890
>
> As I mentioned in SPARK-26655, Spark doesn't restrict using multiple
> stateful operations in single query (except streaming aggregations), where
> the concept of watermark is not covered properly on multiple stateful
> stages.
>
> I've explained this issue with example on dev mailing list earlier, so you
> can refer the link to see rationalization of issue.
>
> https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E
>
> We've not decided how to let end users avoid the issue (dealing with
> SPARK-26655 is the best but in the meanwhile...) and this patch is trying
> to establish (or discuss) how to guide end users.
>
> SPARK-24634 would be help to end users to determine whether their query is
> affected by this issue, as in append mode intermediate output should not be
> later than watermark.
> ----
>
> Please chime in and share your curation if I'm missing something.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>