You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Martijn Visser <ma...@apache.org> on 2022/07/01 08:20:27 UTC

Re: [DISCUSS] Support partition pruning for streaming reading

Hi zoucao,

I think this topic deserves a proper FLIP and a vote. This approach is
focussed only on Hive, but I would also like to understand the implications
for FileSource. Can you create one?

Best regards,

Martijn

Op wo 22 jun. 2022 om 18:50 schreef cao zou <zo...@gmail.com>:

> Hi devs, I want to start a discussion to find a way to support partition
> pruning for streaming reading.
>
>
> Now, Flink has supported the partition pruning, the implementation consists
> of *Source Ability*, *Logical Rule*, and the interface
> *SupportsPartitionPushDown*, but they all only take effect in batch
> reading. When reading a table in streaming mode, the existing mechanism
> will cause some problems posted by FLINK-27898
> <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records
> that should be filtered will be sent downstream.
>
> To solve this drawback, this discussion is proposed, and the Hive and other
> BigData systems stored with partitions will benefit more from it.
>
>  Now, the existing partitions which are needed to consume will be generated
> in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
> pushed into TableSource. It’s working well in batch mode, but if we want to
> read records from Hive in streaming mode, and consider the partitions
> committed in the future, it’s not enough.
>
> To support pruning the partitions committed in the feature, the pruning
> function should be pushed into the TableSource, and then delivered to
> *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> partitions can be invoked here.
>
> Before proposing the changes, I think it is necessary to clarify the
> existing pruning logic. The main logic of the pruning in
> *PushPartitionIntoTableSourceScanRule* is as follows.
>
> Firstly, generating a pruning function called partitionPruner, the function
> is extended from a RichMapFunction<GenericRowData, Boolean>.
>
>
> if tableSource.listPartitions() is not empty:
>   partitions = dynamicTableSource.listPartitions()
>
>   for p in partitions:
>     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
>
>     add p to partitionsAfterPruning where the predicate is true.
>
> else  tableSource.listPartitions() is empty:
>   if the filter can be converted to ResolvedExpression &&
>     the catalog can support the filter :
>
>     partitionsAfterPruning = catalog.listPartitionsByFilter()
>
>     the value of partitionsAfterPruning is all needed.
>   else :
>
>     partitions = catalog.listPartitions()
>     for p in partitions:
>     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
>
>      add p to partitionsAfterPruning where the predicate is true.
>
> I think the main logic can be classified into two sides, one exists in the
> logical rule, and the other exists in the connector side. The catalog info
> should be used on the rule side, and not on the connector side, the pruning
> function could be used on both of them or unified on the connector side.
>
>
> Proposed changes
>
>
>    - add a new method in SupportsPartitionPushDown
>    - let HiveSourceTable, HiveSourceBuilder, and
>    HiveContinuousPartitionFetcher hold the pruning function.
>    - pruning after fetchPartitions invoked.
>
> Considering the version compatibility and the optimization for the method
> of listing partitions with filter in the catalog, I think we can add a new
> method in *SupportsPartitionPushDown*
>
> /**
> * Provides a list of remaining partitions. After those partitions are
> applied, a source must
> * not read the data of other partitions during runtime.
> *
> * <p>See the documentation of {@link SupportsPartitionPushDown} for more
> information.
> */
> void applyPartitions(List<Map<String, String>> remainingPartitions);
>
> /**
> * Provides a pruning function for uncommitted partitions.
> */
> default void applyPartitionPuringFunction(MapFunction<RowData, Boolean>
> partitionPruningFunction) { }
>
> We can push the generated function into TableSource, such that the
> ContinuousPartitionFetcher can get it.
>
> For Batch reading, the 'remainingPartitions' will be seen as the partitions
> needed to consume, for streaming reading, we use the
> 'partitionPruningFunction' to ignore the unneeded partitions.
> Rejected Alternatives
>
> Do not remove the filter logic in Filter Node about the partition keys, if
> the source will execute streaming reading.
>
>
> Looking forward to your opinions.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-27898
>
> best
>
> zoucao
>

Re: [DISCUSS] Support partition pruning for streaming reading

Posted by cao zou <zo...@gmail.com>.
Hi, godfrey and Jark, thanks for joining the discussion.

The implications for FileSource


Sorry about that, I have missed an important feature that the FileSource

supports continuous reading. I think we can do the same thing for the
FileSource,

which is using the partition pruning function to filter unneeded
partitions.

The function will be held by  `FileSystemTableSource`,
`FileSource`,`AbstractFileSource` and `ContinuousFileSplitEnumerator`. At
last, it will be used in
`ContinuousFileSplitEnumerator#processDiscoveredSplits`, the splits
belonging to

unneeded partitions will be dropped here, you can refer to [1].

I think the pruning function will only affect the streaming reading, the
batch

reading will stay the same.


 About `FilterFunction<CatalogPartitionSpec> partitionFilter`.


I agree that FilterFunction is enough and the `CatalogPartitionSpec` I

s considered as the input type that will is more meaningful and suitable
than `RowData`,

and I propose we could combine `PartitionSpecToRowData` with `FilterLogic`
in the same function.

I think it is better that we can pass the `FilterFunction` to the connector
sides by invoking `applyPartitionPuringFunction`, and I try to complete a
simple POC[1]. Unfortunately,

I met a blocker about class loading, the code-generated function can not be
passed

from the client to JobMaster, the exception is shown in the UT[2]. Am I
missing something important?


As far as I think, we could not support passing the initialized code
generated function

from client to JobManager or TaskManager, and the
`GeneratedFunction<FilterFunction>`

is the right choice. But the relevant class about the generated code is
defined

in flink-table-runtime, `SupportsPartitionPushDown` is defined in
flink-table-common,

and the `GeneratedFunction` could be seen as the input type in `
applyPartitionPuringFunction`.



About `applyPartitionPuringFunction` method affects batch/bounded table
> sources


As the code shown, the partition pruning function will only be used in
streaming mode,

and won‘t be called in batch mode. Although we can unify in batch and
streaming mode,

i think it will bring some drawbacks which affect Statics, parallelism
infer.



[1]
https://github.com/apache/flink/compare/master...zoucao:flink:dynamic-partition-pruning

[2]
https://github.com/zoucao/flink/blob/c37d984169c4d099d3ca0458414175168c6e98af/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java#L178

Best regards,

zoucao


Jark Wu <im...@gmail.com> 于2022年7月4日周一 20:07写道:

> Hi zoucao,
>
> Regarding the pruning function, maybe a simple filter function is enough,
>  e.g. `FilterFunction<CatalogPartitionSpec> partitionFilter`.
>
> Besides, it would be better to state clearly how the new
> `applyPartitionPuringFunction`
> method affects batch/bounded table sources. From my understanding,
> this method won't be called in batch mode?
>
> Best,
> Jark
>
> On Mon, 4 Jul 2022 at 19:40, Martijn Visser <ma...@apache.org>
> wrote:
>
> > Hi zoucao,
> >
> > The FileSource does support streaming reading [1].
> >
> > Best regards,
> >
> > Martijn
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html
> >
> > Op ma 4 jul. 2022 om 05:58 schreef godfrey he <go...@gmail.com>:
> >
> > > Hi zoucao,
> > >
> > > Look forward your FLIP.
> > >
> > > >For Batch reading, the 'remainingPartitions' will be seen as the
> > > partitions
> > > >needed to consume, for streaming reading, we use the
> > > >'partitionPruningFunction' to ignore the unneeded partitions.
> > > There should be for bounded source(maybe batch or streaming),
> > > `applyPartitions` should be used,
> > > while only for unbounded source, `applyPartitionPuringFunction` can be
> > > used.
> > >
> > > Best,
> > > Godfrey
> > >
> > > cao zou <zo...@gmail.com> 于2022年7月4日周一 11:04写道:
> > > >
> > > > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and
> > > could
> > > > you help give me the permission?
> > > > My Id is zoucao, and my mail is zoucao310@gmail.com.
> > > >
> > > > The implications for FileSource
> > > >
> > > > In the above discussion, only HiveSource has been involved, because
> it
> > > > holds a continuous partition fetcher, but FileSource not. If we do
> the
> > > > streaming pruning only in the partition fetcher, it will not affect
> the
> > > > FileSource. If the FileSource supports streaming reading in the
> future,
> > > the
> > > > same changes can be applied to it.
> > > >
> > > > Best regards,
> > > > zoucao
> > > >
> > > > Martijn Visser <ma...@apache.org> 于2022年7月1日周五 16:20写道:
> > > >
> > > > > Hi zoucao,
> > > > >
> > > > > I think this topic deserves a proper FLIP and a vote. This approach
> > is
> > > > > focussed only on Hive, but I would also like to understand the
> > > implications
> > > > > for FileSource. Can you create one?
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Martijn
> > > > >
> > > > > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zo...@gmail.com>:
> > > > >
> > > > > > Hi devs, I want to start a discussion to find a way to support
> > > partition
> > > > > > pruning for streaming reading.
> > > > > >
> > > > > >
> > > > > > Now, Flink has supported the partition pruning, the
> implementation
> > > > > consists
> > > > > > of *Source Ability*, *Logical Rule*, and the interface
> > > > > > *SupportsPartitionPushDown*, but they all only take effect in
> batch
> > > > > > reading. When reading a table in streaming mode, the existing
> > > mechanism
> > > > > > will cause some problems posted by FLINK-27898
> > > > > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the
> > > records
> > > > > > that should be filtered will be sent downstream.
> > > > > >
> > > > > > To solve this drawback, this discussion is proposed, and the Hive
> > and
> > > > > other
> > > > > > BigData systems stored with partitions will benefit more from it.
> > > > > >
> > > > > >  Now, the existing partitions which are needed to consume will be
> > > > > generated
> > > > > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions
> > will
> > > be
> > > > > > pushed into TableSource. It’s working well in batch mode, but if
> we
> > > want
> > > > > to
> > > > > > read records from Hive in streaming mode, and consider the
> > partitions
> > > > > > committed in the future, it’s not enough.
> > > > > >
> > > > > > To support pruning the partitions committed in the feature, the
> > > pruning
> > > > > > function should be pushed into the TableSource, and then
> delivered
> > to
> > > > > > *ContinuousPartitionFetcher*, such that the pruning for
> uncommitted
> > > > > > partitions can be invoked here.
> > > > > >
> > > > > > Before proposing the changes, I think it is necessary to clarify
> > the
> > > > > > existing pruning logic. The main logic of the pruning in
> > > > > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > > > > >
> > > > > > Firstly, generating a pruning function called partitionPruner,
> the
> > > > > function
> > > > > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > > > > >
> > > > > >
> > > > > > if tableSource.listPartitions() is not empty:
> > > > > >   partitions = dynamicTableSource.listPartitions()
> > > > > >
> > > > > >   for p in partitions:
> > > > > >     boolean predicate =
> > partitionPruner.map(convertPartitionToRow(p))
> > > > > >
> > > > > >     add p to partitionsAfterPruning where the predicate is true.
> > > > > >
> > > > > > else  tableSource.listPartitions() is empty:
> > > > > >   if the filter can be converted to ResolvedExpression &&
> > > > > >     the catalog can support the filter :
> > > > > >
> > > > > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > > > > >
> > > > > >     the value of partitionsAfterPruning is all needed.
> > > > > >   else :
> > > > > >
> > > > > >     partitions = catalog.listPartitions()
> > > > > >     for p in partitions:
> > > > > >     boolean predicate =
> > partitionPruner.map(convertPartitionToRow(p))
> > > > > >
> > > > > >      add p to partitionsAfterPruning where the predicate is true.
> > > > > >
> > > > > > I think the main logic can be classified into two sides, one
> exists
> > > in
> > > > > the
> > > > > > logical rule, and the other exists in the connector side. The
> > catalog
> > > > > info
> > > > > > should be used on the rule side, and not on the connector side,
> the
> > > > > pruning
> > > > > > function could be used on both of them or unified on the
> connector
> > > side.
> > > > > >
> > > > > >
> > > > > > Proposed changes
> > > > > >
> > > > > >
> > > > > >    - add a new method in SupportsPartitionPushDown
> > > > > >    - let HiveSourceTable, HiveSourceBuilder, and
> > > > > >    HiveContinuousPartitionFetcher hold the pruning function.
> > > > > >    - pruning after fetchPartitions invoked.
> > > > > >
> > > > > > Considering the version compatibility and the optimization for
> the
> > > method
> > > > > > of listing partitions with filter in the catalog, I think we can
> > add
> > > a
> > > > > new
> > > > > > method in *SupportsPartitionPushDown*
> > > > > >
> > > > > > /**
> > > > > > * Provides a list of remaining partitions. After those partitions
> > are
> > > > > > applied, a source must
> > > > > > * not read the data of other partitions during runtime.
> > > > > > *
> > > > > > * <p>See the documentation of {@link SupportsPartitionPushDown}
> for
> > > more
> > > > > > information.
> > > > > > */
> > > > > > void applyPartitions(List<Map<String, String>>
> > remainingPartitions);
> > > > > >
> > > > > > /**
> > > > > > * Provides a pruning function for uncommitted partitions.
> > > > > > */
> > > > > > default void applyPartitionPuringFunction(MapFunction<RowData,
> > > Boolean>
> > > > > > partitionPruningFunction) { }
> > > > > >
> > > > > > We can push the generated function into TableSource, such that
> the
> > > > > > ContinuousPartitionFetcher can get it.
> > > > > >
> > > > > > For Batch reading, the 'remainingPartitions' will be seen as the
> > > > > partitions
> > > > > > needed to consume, for streaming reading, we use the
> > > > > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > > > > Rejected Alternatives
> > > > > >
> > > > > > Do not remove the filter logic in Filter Node about the partition
> > > keys,
> > > > > if
> > > > > > the source will execute streaming reading.
> > > > > >
> > > > > >
> > > > > > Looking forward to your opinions.
> > > > > >
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > > > > >
> > > > > > best
> > > > > >
> > > > > > zoucao
> > > > > >
> > > > >
> > >
> >
>

Re: [DISCUSS] Support partition pruning for streaming reading

Posted by Jark Wu <im...@gmail.com>.
Hi zoucao,

Regarding the pruning function, maybe a simple filter function is enough,
 e.g. `FilterFunction<CatalogPartitionSpec> partitionFilter`.

Besides, it would be better to state clearly how the new
`applyPartitionPuringFunction`
method affects batch/bounded table sources. From my understanding,
this method won't be called in batch mode?

Best,
Jark

On Mon, 4 Jul 2022 at 19:40, Martijn Visser <ma...@apache.org>
wrote:

> Hi zoucao,
>
> The FileSource does support streaming reading [1].
>
> Best regards,
>
> Martijn
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html
>
> Op ma 4 jul. 2022 om 05:58 schreef godfrey he <go...@gmail.com>:
>
> > Hi zoucao,
> >
> > Look forward your FLIP.
> >
> > >For Batch reading, the 'remainingPartitions' will be seen as the
> > partitions
> > >needed to consume, for streaming reading, we use the
> > >'partitionPruningFunction' to ignore the unneeded partitions.
> > There should be for bounded source(maybe batch or streaming),
> > `applyPartitions` should be used,
> > while only for unbounded source, `applyPartitionPuringFunction` can be
> > used.
> >
> > Best,
> > Godfrey
> >
> > cao zou <zo...@gmail.com> 于2022年7月4日周一 11:04写道:
> > >
> > > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and
> > could
> > > you help give me the permission?
> > > My Id is zoucao, and my mail is zoucao310@gmail.com.
> > >
> > > The implications for FileSource
> > >
> > > In the above discussion, only HiveSource has been involved, because it
> > > holds a continuous partition fetcher, but FileSource not. If we do the
> > > streaming pruning only in the partition fetcher, it will not affect the
> > > FileSource. If the FileSource supports streaming reading in the future,
> > the
> > > same changes can be applied to it.
> > >
> > > Best regards,
> > > zoucao
> > >
> > > Martijn Visser <ma...@apache.org> 于2022年7月1日周五 16:20写道:
> > >
> > > > Hi zoucao,
> > > >
> > > > I think this topic deserves a proper FLIP and a vote. This approach
> is
> > > > focussed only on Hive, but I would also like to understand the
> > implications
> > > > for FileSource. Can you create one?
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zo...@gmail.com>:
> > > >
> > > > > Hi devs, I want to start a discussion to find a way to support
> > partition
> > > > > pruning for streaming reading.
> > > > >
> > > > >
> > > > > Now, Flink has supported the partition pruning, the implementation
> > > > consists
> > > > > of *Source Ability*, *Logical Rule*, and the interface
> > > > > *SupportsPartitionPushDown*, but they all only take effect in batch
> > > > > reading. When reading a table in streaming mode, the existing
> > mechanism
> > > > > will cause some problems posted by FLINK-27898
> > > > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the
> > records
> > > > > that should be filtered will be sent downstream.
> > > > >
> > > > > To solve this drawback, this discussion is proposed, and the Hive
> and
> > > > other
> > > > > BigData systems stored with partitions will benefit more from it.
> > > > >
> > > > >  Now, the existing partitions which are needed to consume will be
> > > > generated
> > > > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions
> will
> > be
> > > > > pushed into TableSource. It’s working well in batch mode, but if we
> > want
> > > > to
> > > > > read records from Hive in streaming mode, and consider the
> partitions
> > > > > committed in the future, it’s not enough.
> > > > >
> > > > > To support pruning the partitions committed in the feature, the
> > pruning
> > > > > function should be pushed into the TableSource, and then delivered
> to
> > > > > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > > > > partitions can be invoked here.
> > > > >
> > > > > Before proposing the changes, I think it is necessary to clarify
> the
> > > > > existing pruning logic. The main logic of the pruning in
> > > > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > > > >
> > > > > Firstly, generating a pruning function called partitionPruner, the
> > > > function
> > > > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > > > >
> > > > >
> > > > > if tableSource.listPartitions() is not empty:
> > > > >   partitions = dynamicTableSource.listPartitions()
> > > > >
> > > > >   for p in partitions:
> > > > >     boolean predicate =
> partitionPruner.map(convertPartitionToRow(p))
> > > > >
> > > > >     add p to partitionsAfterPruning where the predicate is true.
> > > > >
> > > > > else  tableSource.listPartitions() is empty:
> > > > >   if the filter can be converted to ResolvedExpression &&
> > > > >     the catalog can support the filter :
> > > > >
> > > > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > > > >
> > > > >     the value of partitionsAfterPruning is all needed.
> > > > >   else :
> > > > >
> > > > >     partitions = catalog.listPartitions()
> > > > >     for p in partitions:
> > > > >     boolean predicate =
> partitionPruner.map(convertPartitionToRow(p))
> > > > >
> > > > >      add p to partitionsAfterPruning where the predicate is true.
> > > > >
> > > > > I think the main logic can be classified into two sides, one exists
> > in
> > > > the
> > > > > logical rule, and the other exists in the connector side. The
> catalog
> > > > info
> > > > > should be used on the rule side, and not on the connector side, the
> > > > pruning
> > > > > function could be used on both of them or unified on the connector
> > side.
> > > > >
> > > > >
> > > > > Proposed changes
> > > > >
> > > > >
> > > > >    - add a new method in SupportsPartitionPushDown
> > > > >    - let HiveSourceTable, HiveSourceBuilder, and
> > > > >    HiveContinuousPartitionFetcher hold the pruning function.
> > > > >    - pruning after fetchPartitions invoked.
> > > > >
> > > > > Considering the version compatibility and the optimization for the
> > method
> > > > > of listing partitions with filter in the catalog, I think we can
> add
> > a
> > > > new
> > > > > method in *SupportsPartitionPushDown*
> > > > >
> > > > > /**
> > > > > * Provides a list of remaining partitions. After those partitions
> are
> > > > > applied, a source must
> > > > > * not read the data of other partitions during runtime.
> > > > > *
> > > > > * <p>See the documentation of {@link SupportsPartitionPushDown} for
> > more
> > > > > information.
> > > > > */
> > > > > void applyPartitions(List<Map<String, String>>
> remainingPartitions);
> > > > >
> > > > > /**
> > > > > * Provides a pruning function for uncommitted partitions.
> > > > > */
> > > > > default void applyPartitionPuringFunction(MapFunction<RowData,
> > Boolean>
> > > > > partitionPruningFunction) { }
> > > > >
> > > > > We can push the generated function into TableSource, such that the
> > > > > ContinuousPartitionFetcher can get it.
> > > > >
> > > > > For Batch reading, the 'remainingPartitions' will be seen as the
> > > > partitions
> > > > > needed to consume, for streaming reading, we use the
> > > > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > > > Rejected Alternatives
> > > > >
> > > > > Do not remove the filter logic in Filter Node about the partition
> > keys,
> > > > if
> > > > > the source will execute streaming reading.
> > > > >
> > > > >
> > > > > Looking forward to your opinions.
> > > > >
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > > > >
> > > > > best
> > > > >
> > > > > zoucao
> > > > >
> > > >
> >
>

Re: [DISCUSS] Support partition pruning for streaming reading

Posted by Martijn Visser <ma...@apache.org>.
Hi zoucao,

The FileSource does support streaming reading [1].

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html

Op ma 4 jul. 2022 om 05:58 schreef godfrey he <go...@gmail.com>:

> Hi zoucao,
>
> Look forward your FLIP.
>
> >For Batch reading, the 'remainingPartitions' will be seen as the
> partitions
> >needed to consume, for streaming reading, we use the
> >'partitionPruningFunction' to ignore the unneeded partitions.
> There should be for bounded source(maybe batch or streaming),
> `applyPartitions` should be used,
> while only for unbounded source, `applyPartitionPuringFunction` can be
> used.
>
> Best,
> Godfrey
>
> cao zou <zo...@gmail.com> 于2022年7月4日周一 11:04写道:
> >
> > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and
> could
> > you help give me the permission?
> > My Id is zoucao, and my mail is zoucao310@gmail.com.
> >
> > The implications for FileSource
> >
> > In the above discussion, only HiveSource has been involved, because it
> > holds a continuous partition fetcher, but FileSource not. If we do the
> > streaming pruning only in the partition fetcher, it will not affect the
> > FileSource. If the FileSource supports streaming reading in the future,
> the
> > same changes can be applied to it.
> >
> > Best regards,
> > zoucao
> >
> > Martijn Visser <ma...@apache.org> 于2022年7月1日周五 16:20写道:
> >
> > > Hi zoucao,
> > >
> > > I think this topic deserves a proper FLIP and a vote. This approach is
> > > focussed only on Hive, but I would also like to understand the
> implications
> > > for FileSource. Can you create one?
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zo...@gmail.com>:
> > >
> > > > Hi devs, I want to start a discussion to find a way to support
> partition
> > > > pruning for streaming reading.
> > > >
> > > >
> > > > Now, Flink has supported the partition pruning, the implementation
> > > consists
> > > > of *Source Ability*, *Logical Rule*, and the interface
> > > > *SupportsPartitionPushDown*, but they all only take effect in batch
> > > > reading. When reading a table in streaming mode, the existing
> mechanism
> > > > will cause some problems posted by FLINK-27898
> > > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the
> records
> > > > that should be filtered will be sent downstream.
> > > >
> > > > To solve this drawback, this discussion is proposed, and the Hive and
> > > other
> > > > BigData systems stored with partitions will benefit more from it.
> > > >
> > > >  Now, the existing partitions which are needed to consume will be
> > > generated
> > > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will
> be
> > > > pushed into TableSource. It’s working well in batch mode, but if we
> want
> > > to
> > > > read records from Hive in streaming mode, and consider the partitions
> > > > committed in the future, it’s not enough.
> > > >
> > > > To support pruning the partitions committed in the feature, the
> pruning
> > > > function should be pushed into the TableSource, and then delivered to
> > > > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > > > partitions can be invoked here.
> > > >
> > > > Before proposing the changes, I think it is necessary to clarify the
> > > > existing pruning logic. The main logic of the pruning in
> > > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > > >
> > > > Firstly, generating a pruning function called partitionPruner, the
> > > function
> > > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > > >
> > > >
> > > > if tableSource.listPartitions() is not empty:
> > > >   partitions = dynamicTableSource.listPartitions()
> > > >
> > > >   for p in partitions:
> > > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > > >
> > > >     add p to partitionsAfterPruning where the predicate is true.
> > > >
> > > > else  tableSource.listPartitions() is empty:
> > > >   if the filter can be converted to ResolvedExpression &&
> > > >     the catalog can support the filter :
> > > >
> > > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > > >
> > > >     the value of partitionsAfterPruning is all needed.
> > > >   else :
> > > >
> > > >     partitions = catalog.listPartitions()
> > > >     for p in partitions:
> > > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > > >
> > > >      add p to partitionsAfterPruning where the predicate is true.
> > > >
> > > > I think the main logic can be classified into two sides, one exists
> in
> > > the
> > > > logical rule, and the other exists in the connector side. The catalog
> > > info
> > > > should be used on the rule side, and not on the connector side, the
> > > pruning
> > > > function could be used on both of them or unified on the connector
> side.
> > > >
> > > >
> > > > Proposed changes
> > > >
> > > >
> > > >    - add a new method in SupportsPartitionPushDown
> > > >    - let HiveSourceTable, HiveSourceBuilder, and
> > > >    HiveContinuousPartitionFetcher hold the pruning function.
> > > >    - pruning after fetchPartitions invoked.
> > > >
> > > > Considering the version compatibility and the optimization for the
> method
> > > > of listing partitions with filter in the catalog, I think we can add
> a
> > > new
> > > > method in *SupportsPartitionPushDown*
> > > >
> > > > /**
> > > > * Provides a list of remaining partitions. After those partitions are
> > > > applied, a source must
> > > > * not read the data of other partitions during runtime.
> > > > *
> > > > * <p>See the documentation of {@link SupportsPartitionPushDown} for
> more
> > > > information.
> > > > */
> > > > void applyPartitions(List<Map<String, String>> remainingPartitions);
> > > >
> > > > /**
> > > > * Provides a pruning function for uncommitted partitions.
> > > > */
> > > > default void applyPartitionPuringFunction(MapFunction<RowData,
> Boolean>
> > > > partitionPruningFunction) { }
> > > >
> > > > We can push the generated function into TableSource, such that the
> > > > ContinuousPartitionFetcher can get it.
> > > >
> > > > For Batch reading, the 'remainingPartitions' will be seen as the
> > > partitions
> > > > needed to consume, for streaming reading, we use the
> > > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > > Rejected Alternatives
> > > >
> > > > Do not remove the filter logic in Filter Node about the partition
> keys,
> > > if
> > > > the source will execute streaming reading.
> > > >
> > > >
> > > > Looking forward to your opinions.
> > > >
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > > >
> > > > best
> > > >
> > > > zoucao
> > > >
> > >
>

Re: [DISCUSS] Support partition pruning for streaming reading

Posted by godfrey he <go...@gmail.com>.
Hi zoucao,

Look forward your FLIP.

>For Batch reading, the 'remainingPartitions' will be seen as the partitions
>needed to consume, for streaming reading, we use the
>'partitionPruningFunction' to ignore the unneeded partitions.
There should be for bounded source(maybe batch or streaming),
`applyPartitions` should be used,
while only for unbounded source, `applyPartitionPuringFunction` can be used.

Best,
Godfrey

cao zou <zo...@gmail.com> 于2022年7月4日周一 11:04写道:
>
> Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and could
> you help give me the permission?
> My Id is zoucao, and my mail is zoucao310@gmail.com.
>
> The implications for FileSource
>
> In the above discussion, only HiveSource has been involved, because it
> holds a continuous partition fetcher, but FileSource not. If we do the
> streaming pruning only in the partition fetcher, it will not affect the
> FileSource. If the FileSource supports streaming reading in the future, the
> same changes can be applied to it.
>
> Best regards,
> zoucao
>
> Martijn Visser <ma...@apache.org> 于2022年7月1日周五 16:20写道:
>
> > Hi zoucao,
> >
> > I think this topic deserves a proper FLIP and a vote. This approach is
> > focussed only on Hive, but I would also like to understand the implications
> > for FileSource. Can you create one?
> >
> > Best regards,
> >
> > Martijn
> >
> > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zo...@gmail.com>:
> >
> > > Hi devs, I want to start a discussion to find a way to support partition
> > > pruning for streaming reading.
> > >
> > >
> > > Now, Flink has supported the partition pruning, the implementation
> > consists
> > > of *Source Ability*, *Logical Rule*, and the interface
> > > *SupportsPartitionPushDown*, but they all only take effect in batch
> > > reading. When reading a table in streaming mode, the existing mechanism
> > > will cause some problems posted by FLINK-27898
> > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records
> > > that should be filtered will be sent downstream.
> > >
> > > To solve this drawback, this discussion is proposed, and the Hive and
> > other
> > > BigData systems stored with partitions will benefit more from it.
> > >
> > >  Now, the existing partitions which are needed to consume will be
> > generated
> > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
> > > pushed into TableSource. It’s working well in batch mode, but if we want
> > to
> > > read records from Hive in streaming mode, and consider the partitions
> > > committed in the future, it’s not enough.
> > >
> > > To support pruning the partitions committed in the feature, the pruning
> > > function should be pushed into the TableSource, and then delivered to
> > > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > > partitions can be invoked here.
> > >
> > > Before proposing the changes, I think it is necessary to clarify the
> > > existing pruning logic. The main logic of the pruning in
> > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > >
> > > Firstly, generating a pruning function called partitionPruner, the
> > function
> > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > >
> > >
> > > if tableSource.listPartitions() is not empty:
> > >   partitions = dynamicTableSource.listPartitions()
> > >
> > >   for p in partitions:
> > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > >
> > >     add p to partitionsAfterPruning where the predicate is true.
> > >
> > > else  tableSource.listPartitions() is empty:
> > >   if the filter can be converted to ResolvedExpression &&
> > >     the catalog can support the filter :
> > >
> > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > >
> > >     the value of partitionsAfterPruning is all needed.
> > >   else :
> > >
> > >     partitions = catalog.listPartitions()
> > >     for p in partitions:
> > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > >
> > >      add p to partitionsAfterPruning where the predicate is true.
> > >
> > > I think the main logic can be classified into two sides, one exists in
> > the
> > > logical rule, and the other exists in the connector side. The catalog
> > info
> > > should be used on the rule side, and not on the connector side, the
> > pruning
> > > function could be used on both of them or unified on the connector side.
> > >
> > >
> > > Proposed changes
> > >
> > >
> > >    - add a new method in SupportsPartitionPushDown
> > >    - let HiveSourceTable, HiveSourceBuilder, and
> > >    HiveContinuousPartitionFetcher hold the pruning function.
> > >    - pruning after fetchPartitions invoked.
> > >
> > > Considering the version compatibility and the optimization for the method
> > > of listing partitions with filter in the catalog, I think we can add a
> > new
> > > method in *SupportsPartitionPushDown*
> > >
> > > /**
> > > * Provides a list of remaining partitions. After those partitions are
> > > applied, a source must
> > > * not read the data of other partitions during runtime.
> > > *
> > > * <p>See the documentation of {@link SupportsPartitionPushDown} for more
> > > information.
> > > */
> > > void applyPartitions(List<Map<String, String>> remainingPartitions);
> > >
> > > /**
> > > * Provides a pruning function for uncommitted partitions.
> > > */
> > > default void applyPartitionPuringFunction(MapFunction<RowData, Boolean>
> > > partitionPruningFunction) { }
> > >
> > > We can push the generated function into TableSource, such that the
> > > ContinuousPartitionFetcher can get it.
> > >
> > > For Batch reading, the 'remainingPartitions' will be seen as the
> > partitions
> > > needed to consume, for streaming reading, we use the
> > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > Rejected Alternatives
> > >
> > > Do not remove the filter logic in Filter Node about the partition keys,
> > if
> > > the source will execute streaming reading.
> > >
> > >
> > > Looking forward to your opinions.
> > >
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > >
> > > best
> > >
> > > zoucao
> > >
> >

Re: [DISCUSS] Support partition pruning for streaming reading

Posted by cao zou <zo...@gmail.com>.
Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and could
you help give me the permission?
My Id is zoucao, and my mail is zoucao310@gmail.com.

The implications for FileSource

In the above discussion, only HiveSource has been involved, because it
holds a continuous partition fetcher, but FileSource not. If we do the
streaming pruning only in the partition fetcher, it will not affect the
FileSource. If the FileSource supports streaming reading in the future, the
same changes can be applied to it.

Best regards,
zoucao

Martijn Visser <ma...@apache.org> 于2022年7月1日周五 16:20写道:

> Hi zoucao,
>
> I think this topic deserves a proper FLIP and a vote. This approach is
> focussed only on Hive, but I would also like to understand the implications
> for FileSource. Can you create one?
>
> Best regards,
>
> Martijn
>
> Op wo 22 jun. 2022 om 18:50 schreef cao zou <zo...@gmail.com>:
>
> > Hi devs, I want to start a discussion to find a way to support partition
> > pruning for streaming reading.
> >
> >
> > Now, Flink has supported the partition pruning, the implementation
> consists
> > of *Source Ability*, *Logical Rule*, and the interface
> > *SupportsPartitionPushDown*, but they all only take effect in batch
> > reading. When reading a table in streaming mode, the existing mechanism
> > will cause some problems posted by FLINK-27898
> > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records
> > that should be filtered will be sent downstream.
> >
> > To solve this drawback, this discussion is proposed, and the Hive and
> other
> > BigData systems stored with partitions will benefit more from it.
> >
> >  Now, the existing partitions which are needed to consume will be
> generated
> > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
> > pushed into TableSource. It’s working well in batch mode, but if we want
> to
> > read records from Hive in streaming mode, and consider the partitions
> > committed in the future, it’s not enough.
> >
> > To support pruning the partitions committed in the feature, the pruning
> > function should be pushed into the TableSource, and then delivered to
> > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > partitions can be invoked here.
> >
> > Before proposing the changes, I think it is necessary to clarify the
> > existing pruning logic. The main logic of the pruning in
> > *PushPartitionIntoTableSourceScanRule* is as follows.
> >
> > Firstly, generating a pruning function called partitionPruner, the
> function
> > is extended from a RichMapFunction<GenericRowData, Boolean>.
> >
> >
> > if tableSource.listPartitions() is not empty:
> >   partitions = dynamicTableSource.listPartitions()
> >
> >   for p in partitions:
> >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> >
> >     add p to partitionsAfterPruning where the predicate is true.
> >
> > else  tableSource.listPartitions() is empty:
> >   if the filter can be converted to ResolvedExpression &&
> >     the catalog can support the filter :
> >
> >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> >
> >     the value of partitionsAfterPruning is all needed.
> >   else :
> >
> >     partitions = catalog.listPartitions()
> >     for p in partitions:
> >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> >
> >      add p to partitionsAfterPruning where the predicate is true.
> >
> > I think the main logic can be classified into two sides, one exists in
> the
> > logical rule, and the other exists in the connector side. The catalog
> info
> > should be used on the rule side, and not on the connector side, the
> pruning
> > function could be used on both of them or unified on the connector side.
> >
> >
> > Proposed changes
> >
> >
> >    - add a new method in SupportsPartitionPushDown
> >    - let HiveSourceTable, HiveSourceBuilder, and
> >    HiveContinuousPartitionFetcher hold the pruning function.
> >    - pruning after fetchPartitions invoked.
> >
> > Considering the version compatibility and the optimization for the method
> > of listing partitions with filter in the catalog, I think we can add a
> new
> > method in *SupportsPartitionPushDown*
> >
> > /**
> > * Provides a list of remaining partitions. After those partitions are
> > applied, a source must
> > * not read the data of other partitions during runtime.
> > *
> > * <p>See the documentation of {@link SupportsPartitionPushDown} for more
> > information.
> > */
> > void applyPartitions(List<Map<String, String>> remainingPartitions);
> >
> > /**
> > * Provides a pruning function for uncommitted partitions.
> > */
> > default void applyPartitionPuringFunction(MapFunction<RowData, Boolean>
> > partitionPruningFunction) { }
> >
> > We can push the generated function into TableSource, such that the
> > ContinuousPartitionFetcher can get it.
> >
> > For Batch reading, the 'remainingPartitions' will be seen as the
> partitions
> > needed to consume, for streaming reading, we use the
> > 'partitionPruningFunction' to ignore the unneeded partitions.
> > Rejected Alternatives
> >
> > Do not remove the filter logic in Filter Node about the partition keys,
> if
> > the source will execute streaming reading.
> >
> >
> > Looking forward to your opinions.
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-27898
> >
> > best
> >
> > zoucao
> >
>