You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jingsong Li <ji...@gmail.com> on 2020/08/04 08:21:10 UTC

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

Hi Kurt, thanks for response, I got your points, they are reasonable, but
let me explain more:

## Parallelism setting for source and sink

Except `SupportsParallelismReport`, the options are:

1.Support multi transformations
If we support multi transformations, yeah, DataStream back again.
Personally I am slightly -1 for this, it makes many functionality not
orthogonal, and makes future optimizers development more difficult. Just
like @Jark Wu <im...@gmail.com> said, it will be difficult to support
state compatibility, parallelism configure, message ordering guarantee in
the future.
If we really want to support multi transformations, yes, there is no need
to support `SupportsParallelismReport`, and let's use multi transformations
happily, and maybe, The presence of other interfaces
(Source,SourceFunction,InputFormat) is getting weaker and weaker.

2.`getParallelism` in ScanSource and DynamicSink.
What are the reasons why we introduced "SupportXX"? All "SupportXX"
interfaces may be placed directly on ScanSource. "Support XX" interface can
make ScanSource cleaner. They are optional inheritance, which can make
users feel more friendly. So I -1 for `getParallelism` in ScanSource and
DynamicSink. I think the parallelism setting is not so important to let
every user see.

Can `LookupTableSource` have parallelism?
I think yes, actually, lookup is an action, and this action must occur in
tasks, so the parallelism of tasks can be configured. And sometimes
the parallelism is about lookup performance and is worth configuring.

## SupportsStatisticsReport and catalog statistics

One of the scenarios I envisioned was:
HiveSource can easily get the file size, and it can only get the file size
at a small cost, but can not get other statistics.
In reality, there are many cases that the statistical information from the
catalog is inaccurate. What I want to do is: the statistics of the catalog
can be judged by the actual file size:
- If the difference is too big, discard catalog statistics.
- If the difference is not big, continue to use catalog statistics.

The actual collected statistical information can have some positive
interaction with the catalog statistical information, which can make the
final statistical information more perfect and accurate.

If you don't think this is a reasonable requirement, I am OK to remove the
catalog statistics argument.

Best,
Jingsong

On Fri, Jul 31, 2020 at 3:32 PM Kurt Young <yk...@gmail.com> wrote:

> 1. Even if there are some "Supports" interfaces that are not orthogonal
> with ScanTableSource and LookupTableSource,
> it doesn't mean we should encourage such usage. Such concept conflicts will
> accumulate to larger issues which will
> hurt us in the future.
>
> 2. Regarding to SupportsStatisticsReport, I think the interface is a bit
> fuzzy. From the interface name, I was expecting that
> this source will try to gather and report statistics of their own. But it
> also receives some catalog statistics, what is this?
> Why does the table source need to *report* statistics when there already
> exists some statistics from the catalog? Would this
> catalog statistics always exist?
>
> 3.
> > Regarding If there are multiple Transformations in source op, and they
> >r equire different parallelism. In this case, it should be left to the
> > source to set the parallelism
> This sounds like a contradiction to the interface you want to introduce.
> I'm more confused, do you want the framework to take care
> the parallelism setting for the source operator, or do you want to let the
> source operator set the parallelism?
>
> Best,
> Kurt
>
>
> On Fri, Jul 31, 2020 at 1:43 PM Jingsong Li <ji...@gmail.com>
> wrote:
>
> > Hi, thanks for your responses.
> >
> > To Benchao:
> >
> > Glad to see your works and requirements, they should be Public.
> >
> > To Kurt:
> >
> > 1.Regarding "SupportsXXX" for ScanTableSource or LookupTableSource
> > or DynamicTableSink, I don't think a "SupportsXXX" must work with all
> these
> > three types. As Godfrey said, Such as a LookupTableSource should not
> extend
> > from SupportsWatermarkPushDown and SupportsComputedColumnPushDown. We
> just
> > try our best to make all combinations work, like
> > "SupportsParallelismReport", it can work with both ScanTableSource
> > and DynamicTableSink.
> >
> > About adding the method "reportParallelism" we want directly to
> > ScanTableSource and DynamicTableSink, I think maybe most of sources/sinks
> > do not want to see this method, provides a "SupportsXXX" aim to give
> > connector developer a option selection.
> >
> > 2.Regarding SupportsStatisticsReport doesn't work for unbounded streaming
> > table sources, yes, it is, the statistics (Including catalog statistics)
> > are not related to stream tables, but I think, in future, we can create
> > more useful statistics information for streaming tables.
> >
> > 3."oldStats" in SupportsStatisticsReport, "oldStats" should be named to
> > "catalogStats", source just try its best to get more useful and accurate
> > statistic information, but just like Godfrey said, it is a supplement to
> > catalog statistics, it can just supplement missing or inaccurate
> > information in the catalog.
> >
> > 4.Internal or Public, I am glad to see your requirements, I am OK with
> > Public.
> >
> > To Godfrey:
> >
> > Regarding If there are multiple Transformations in source op, and they
> > require different parallelism. In this case, it should be left to the
> > source to set the parallelism. So, these should be two things that are
> > orthogonal. Users who do not use multi Transformations still need to set
> > parallelism.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Jul 30, 2020 at 8:31 PM godfrey he <go...@gmail.com> wrote:
> >
> > > Thanks Jingsong for bringing up this discussion,
> > >  and thanks Kurt for the detailed thoughts.
> > >
> > > First of all, I also think it's a very useful feature to expose more
> > > ability for table source.
> > >
> > > 1) If we want to support [1], it's seem that SupportsParallelismReport
> > > does not meet the requirement: If there are multiple Transformations in
> > > source op,
> > > and they require different parallelism.
> > >
> > > 2) regarding to "SupportsXXX" for ScanTableSource or LookupTableSource,
> > > Currently, we also do not distinguish them for the existing
> > "SupportsXXX".
> > > Such as a LookupTableSource should not extend from
> > > SupportsWatermarkPushDown
> > > and SupportsComputedColumnPushDown.
> > > A DynamicTableSource sub-class will extend from "SupportsXXX" only if
> it
> > > has the capability,
> > > So the unbounded table source should not extend from
> > > SupportsStatisticsReport,
> > > or just return unknown for unbounded if a table source can work for
> both
> > > bounded and unbounded.
> > >
> > > I think SupportsStatisticsReport is a supplement to catalog statistics,
> > > that means
> > > only catalog statistic is unknown, SupportsStatisticsReport works.
> > >
> > > 3)  +1 to make them as public.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18674
> > >
> > > Best,
> > > Godfrey
> > >
> > >
> > >
> > > Kurt Young <yk...@gmail.com> 于2020年7月30日周四 下午4:01写道:
> > >
> > > > Hi Jingsong,
> > > >
> > > > Thanks for bringing up this discussion. In general, I'm +1 to enrich
> > the
> > > > source ability by
> > > > the parallelism and stats reporting, but I'm not sure whether
> > introducing
> > > > such "SupportsXXXX"
> > > > interface is a good idea. I will share my thoughts separately.
> > > >
> > > > 1) Regarding the interface SupportsParallelismReport, first of all,
> my
> > > > feeling is that such a mechanism
> > > > is not like other abilities like SupportsProjectionPushDown.
> > Parallelism
> > > of
> > > > source operator would be
> > > > decided anyway, the only difference here is whether it's decided
> purely
> > > by
> > > > framework or by table source
> > > > itself. So another angle to understand this issue is, we can always
> > > assume
> > > > a table source has the
> > > > ability to determine the parallelism. The table source can choose to
> > set
> > > > the parallelism by itself, or delegate
> > > > it to the framework.
> > > >
> > > > This might sound like personal taste, but there is another bad case
> if
> > we
> > > > introduce the interface. You
> > > > may already know we currently have two major table
> > > > sources, LookupTableSource and ScanTableSource.
> > > > IIUC it won't make much sense if the user provides a
> LookupTableSource
> > > and
> > > > also implements
> > > > SupportsParallelismReport.
> > > >
> > > > An alternative solution would be add the method you want directly
> > > > to ScanTableSource, and also have
> > > > a default implementation returning -1, which means letting framework
> to
> > > > decide the parallelism.
> > > >
> > > > 2) Regarding the interface SupportsStatisticsReport, it seems this
> > > > interface doesn't work for unbounded
> > > > streaming table sources. What kind of implementation do you expect in
> > > such
> > > > a case? And how does this
> > > > interface work with LookupTableSource?
> > > > Another question is what the oldStats parameter is used for?
> > > >
> > > > 3) Internal or Public. I don't think we should mark them as internal.
> > > They
> > > > are currently only used by internal
> > > > connectors doesn't mean this interface should be internal. I can
> > imagine
> > > > there will be lots of Filesystem like
> > > > connectors outside the project which need such capability.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <li...@apache.org>
> > wrote:
> > > >
> > > > > Hi Jingsong,
> > > > >
> > > > > Regarding SupportsParallelismReport,
> > > > > I think the streaming connectors can also benefit from it.
> > > > > I see some requirements from user ML that they want to control
> > > > > source/sink's parallelism instead
> > > > > to set them to global parallelism.
> > > > > Also, in our compony, we did this too.
> > > > >
> > > > > Jingsong Li <ji...@gmail.com> 于2020年7月30日周四 上午11:16写道:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > ## SupportsParallelismReport
> > > > > >
> > > > > > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still
> > > using
> > > > > the
> > > > > > old interfaces.
> > > > > >
> > > > > > We are considering migrating to the new interface.
> > > > > >
> > > > > > However, one problem is that in the old interface implementation,
> > > > > > connectors infer parallelism by itself instead of a global
> > > parallelism
> > > > > > configuration. Hive & filesystem determines the parallelism size
> > > > > according
> > > > > > to the number of files and the size of the file. In this way,
> large
> > > > > tables
> > > > > > may use thousands of parallelisms, while small tables only have
> 10
> > > > > > parallelisms, which can minimize the consumption of task
> > scheduling.
> > > > > >
> > > > > > This situation is very common in batch computing. For example, in
> > the
> > > > > star
> > > > > > model, a large table needs to be joined with multiple small
> tables.
> > > > > >
> > > > > > So we should give this ability to new table source interfaces.
> The
> > > > > > interface can be:
> > > > > >
> > > > > > /**
> > > > > >  * Enables to give source the ability to report parallelism.
> > > > > >  *
> > > > > >  * <p>After filtering push down and partition push down, the
> source
> > > > > > can have more information,
> > > > > >  * which can help it infer more effective parallelism.
> > > > > >  */
> > > > > > @Internal
> > > > > > public interface SupportsParallelismReport {
> > > > > >
> > > > > >    /**
> > > > > >     * Report parallelism from source or sink. The parallelism of
> an
> > > > > > operator must be at least 1,
> > > > > >     * or -1 (use system default).
> > > > > >     */
> > > > > >    int reportParallelism();
> > > > > > }
> > > > > >
> > > > > >
> > > > > > Rejected Alternatives:
> > > > > > - SupportsSplitReport: What is the relationship between this
> split
> > > and
> > > > > the
> > > > > > split of FLIP-27? Do we have to match them one by one? I think
> they
> > > are
> > > > > two
> > > > > > independent things. In fact, the design of FLIP-27, split and
> > > > parallelism
> > > > > > are not bound one by one.
> > > > > > - SupportsPartitionReport: What is partition? Actually, in
> > table/SQL,
> > > > > > partition is a special concept of table. It should not be mixed
> > with
> > > > > > parallelism.
> > > > > >
> > > > > > ## SupportsStatisticsReport
> > > > > >
> > > > > > As with parallelism, statistics information from source will be
> > more
> > > > > > appropriate and accurate. After filtering push down and partition
> > > push
> > > > > > down, the source can have more information, which can help it
> infer
> > > > more
> > > > > > effective statistics. However, if we only infer from the planner
> > > > itself,
> > > > > it
> > > > > > may lead to a big gap between the statistics information and the
> > real
> > > > > > situation.
> > > > > >
> > > > > > The interface:
> > > > > >
> > > > > > /**
> > > > > >  * Enables to give {@link ScanTableSource} the ability to report
> > > table
> > > > > > statistics.
> > > > > >  *
> > > > > >  * <p>Statistics can be inferred from real data in real time,  it
> > is
> > > > > > more accurate than the
> > > > > >  * statistics in the catalog.
> > > > > >  *
> > > > > >  * <p>After filtering push down and partition push down, the
> source
> > > > > > can have more information,
> > > > > >  * which can help it infer more effective table statistics.
> > > > > >  */
> > > > > > @Internal
> > > > > > public interface SupportsStatisticsReport {
> > > > > >
> > > > > >    /**
> > > > > >     * Reports {@link TableStats} from old table stats.
> > > > > >     */
> > > > > >    TableStats reportTableStatistics(TableStats oldStats);
> > > > > > }
> > > > > >
> > > > > >
> > > > > > When to invoke reported statistics to the planner?
> > > > > > - First of all, this call can be expensive (to view the metadata
> of
> > > the
> > > > > > files), so it can't be called repeatedly.
> > > > > > - We need to call after FilterPushdown, because that's the most
> > > > accurate
> > > > > > information. We also need to call before CBO (Like JoinReorder
> and
> > > > choose
> > > > > > BroadcastJoin or ShuffleJoin), because that's where statistics
> are
> > > > used.
> > > > > >
> > > > > > Rejected Alternatives:
> > > > > > - Using CatalogTableStatistics: CatalogTableStatistics or
> > > TableStats? I
> > > > > > lean to TableStats, because TableStats is the class used by
> > planner,
> > > > > > but CatalogTableStatistics may contains some catalog information
> > > which
> > > > is
> > > > > > not related to planner optimizer.
> > > > > >
> > > > > > ## Internal or Public
> > > > > >
> > > > > > I personally lean to internal, these interfaces are only used
> Hive
> > > and
> > > > > > Filesystem, another way is: SupportsParallelismReport(Internal, I
> > > > haven't
> > > > > > seen this requirement from outside.) and
> > > > SupportsStatisticsReport(Public,
> > > > > > maybe Apache Iceberg Flink connector can use it).
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > > > > >
> > > > > > Best,
> > > > > > Jingsong Lee
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee