You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Rong Rong (Jira)" <ji...@apache.org> on 2019/09/03 16:36:00 UTC

[jira] [Commented] (FLINK-12399) FilterableTableSource does not use filters on job run

    [ https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921546#comment-16921546 ] 

Rong Rong commented on FLINK-12399:
-----------------------------------

Hi [~fhueske]. would you please kindly take a look at the approach to address this issue? The problem has been created some problems for us and also some multiple threads in the mailing list.
It would be nice to address this before the next release. Much appreciated. 

> FilterableTableSource does not use filters on job run
> -----------------------------------------------------
>
>                 Key: FLINK-12399
>                 URL: https://issues.apache.org/jira/browse/FLINK-12399
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.8.0
>            Reporter: Josh Bradt
>            Assignee: Rong Rong
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink-filter-bug.tar.gz
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> As discussed [on the mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], there appears to be a bug where a job that uses a custom FilterableTableSource does not keep the filters that were pushed down into the table source. More specifically, the table source does receive filters via applyPredicates, and a new table source with those filters is returned, but the final job graph appears to use the original table source, which does not contain any filters.
> I attached a minimal example program to this ticket. The custom table source is as follows: 
> {code:java}
> public class CustomTableSource implements BatchTableSource<Model>, FilterableTableSource<Model> {
>     private static final Logger LOG = LoggerFactory.getLogger(CustomTableSource.class);
>     private final Filter[] filters;
>     private final FilterConverter converter = new FilterConverter();
>     public CustomTableSource() {
>         this(null);
>     }
>     private CustomTableSource(Filter[] filters) {
>         this.filters = filters;
>     }
>     @Override
>     public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
>         if (filters == null) {
>            LOG.info("==== No filters defined ====");
>         } else {
>             LOG.info("==== Found filters ====");
>             for (Filter filter : filters) {
>                 LOG.info("FILTER: {}", filter);
>             }
>         }
>         return execEnv.fromCollection(allModels());
>     }
>     @Override
>     public TableSource<Model> applyPredicate(List<Expression> predicates) {
>         LOG.info("Applying predicates");
>         List<Filter> acceptedFilters = new ArrayList<>();
>         for (final Expression predicate : predicates) {
>             converter.convert(predicate).ifPresent(acceptedFilters::add);
>         }
>         return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
>     }
>     @Override
>     public boolean isFilterPushedDown() {
>         return filters != null;
>     }
>     @Override
>     public TypeInformation<Model> getReturnType() {
>         return TypeInformation.of(Model.class);
>     }
>     @Override
>     public TableSchema getTableSchema() {
>         return TableSchema.fromTypeInfo(getReturnType());
>     }
>     private List<Model> allModels() {
>         List<Model> models = new ArrayList<>();
>         models.add(new Model(1, 2, 3, 4));
>         models.add(new Model(10, 11, 12, 13));
>         models.add(new Model(20, 21, 22, 23));
>         return models;
>     }
> }
> {code}
>  
> When run, it logs
> {noformat}
> 15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
> 15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
> 15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
> 15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource                       - ==== No filters defined ===={noformat}
> which appears to indicate that although filters are getting pushed down, the final job does not use them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)