You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Josh Bradt (JIRA)" <ji...@apache.org> on 2019/05/03 19:44:00 UTC

[jira] [Created] (FLINK-12399) FilterableTableSource does not use filters on job run

Josh Bradt created FLINK-12399:
----------------------------------

             Summary: FilterableTableSource does not use filters on job run
                 Key: FLINK-12399
                 URL: https://issues.apache.org/jira/browse/FLINK-12399
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.8.0
            Reporter: Josh Bradt
         Attachments: flink-filter-bug.tar.gz

As discussed [on the mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], there appears to be a bug where a job that uses a custom FilterableTableSource does not keep the filters that were pushed down into the table source. More specifically, the table source does receive filters via applyPredicates, and a new table source with those filters is returned, but the final job graph appears to use the original table source, which does not contain any filters.

I attached a minimal example program to this ticket. The custom table source is as follows: 
{code:java}
public class CustomTableSource implements BatchTableSource<Model>, FilterableTableSource<Model> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomTableSource.class);

    private final Filter[] filters;

    private final FilterConverter converter = new FilterConverter();

    public CustomTableSource() {
        this(null);
    }

    private CustomTableSource(Filter[] filters) {
        this.filters = filters;
    }

    @Override
    public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
        if (filters == null) {
           LOG.info("==== No filters defined ====");
        } else {
            LOG.info("==== Found filters ====");
            for (Filter filter : filters) {
                LOG.info("FILTER: {}", filter);
            }
        }

        return execEnv.fromCollection(allModels());
    }

    @Override
    public TableSource<Model> applyPredicate(List<Expression> predicates) {
        LOG.info("Applying predicates");

        List<Filter> acceptedFilters = new ArrayList<>();
        for (final Expression predicate : predicates) {
            converter.convert(predicate).ifPresent(acceptedFilters::add);
        }

        return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
    }

    @Override
    public boolean isFilterPushedDown() {
        return filters != null;
    }

    @Override
    public TypeInformation<Model> getReturnType() {
        return TypeInformation.of(Model.class);
    }

    @Override
    public TableSchema getTableSchema() {
        return TableSchema.fromTypeInfo(getReturnType());
    }

    private List<Model> allModels() {
        List<Model> models = new ArrayList<>();

        models.add(new Model(1, 2, 3, 4));
        models.add(new Model(10, 11, 12, 13));
        models.add(new Model(20, 21, 22, 23));

        return models;
    }
}
{code}
 

When run, it logs
{noformat}
15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource                       - ==== No filters defined ===={noformat}
which appears to indicate that although filters are getting pushed down, the final job does not use them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)