You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Rong Rong (JIRA)" <ji...@apache.org> on 2019/05/04 16:55:00 UTC
[jira] [Commented] (FLINK-12399) FilterableTableSource does not use
filters on job run
[ https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833107#comment-16833107 ]
Rong Rong commented on FLINK-12399:
-----------------------------------
Hi [~josh.bradt]. I think I found the root cause of this issue.
Apparently you have to override the method {{explainSource}} in order to let calcite know that the new created TableSource with filter pushedDown is different from the original created CustomeTableSource (where you have not applyPredicates).
I think this might be related to the #4 changelog point https://github.com/apache/flink/pull/8324: when I try upgrading to CALCITE 1.19.0 I also encounter some weird issues where calcite tries to find the correct tablesource from the digest strings.
I will assigned to myself and start looking into this issue. Please let me know if adding the override resolves your issue at this moment.
> FilterableTableSource does not use filters on job run
> -----------------------------------------------------
>
> Key: FLINK-12399
> URL: https://issues.apache.org/jira/browse/FLINK-12399
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.8.0
> Reporter: Josh Bradt
> Priority: Major
> Attachments: flink-filter-bug.tar.gz
>
>
> As discussed [on the mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], there appears to be a bug where a job that uses a custom FilterableTableSource does not keep the filters that were pushed down into the table source. More specifically, the table source does receive filters via applyPredicates, and a new table source with those filters is returned, but the final job graph appears to use the original table source, which does not contain any filters.
> I attached a minimal example program to this ticket. The custom table source is as follows:
> {code:java}
> public class CustomTableSource implements BatchTableSource<Model>, FilterableTableSource<Model> {
> private static final Logger LOG = LoggerFactory.getLogger(CustomTableSource.class);
> private final Filter[] filters;
> private final FilterConverter converter = new FilterConverter();
> public CustomTableSource() {
> this(null);
> }
> private CustomTableSource(Filter[] filters) {
> this.filters = filters;
> }
> @Override
> public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
> if (filters == null) {
> LOG.info("==== No filters defined ====");
> } else {
> LOG.info("==== Found filters ====");
> for (Filter filter : filters) {
> LOG.info("FILTER: {}", filter);
> }
> }
> return execEnv.fromCollection(allModels());
> }
> @Override
> public TableSource<Model> applyPredicate(List<Expression> predicates) {
> LOG.info("Applying predicates");
> List<Filter> acceptedFilters = new ArrayList<>();
> for (final Expression predicate : predicates) {
> converter.convert(predicate).ifPresent(acceptedFilters::add);
> }
> return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
> }
> @Override
> public boolean isFilterPushedDown() {
> return filters != null;
> }
> @Override
> public TypeInformation<Model> getReturnType() {
> return TypeInformation.of(Model.class);
> }
> @Override
> public TableSchema getTableSchema() {
> return TableSchema.fromTypeInfo(getReturnType());
> }
> private List<Model> allModels() {
> List<Model> models = new ArrayList<>();
> models.add(new Model(1, 2, 3, 4));
> models.add(new Model(10, 11, 12, 13));
> models.add(new Model(20, 21, 22, 23));
> return models;
> }
> }
> {code}
>
> When run, it logs
> {noformat}
> 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates
> 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates
> 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates
> 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource - ==== No filters defined ===={noformat}
> which appears to indicate that although filters are getting pushed down, the final job does not use them.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)