You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Luca Marchi <lu...@teampicnic.com> on 2023/05/17 15:41:50 UTC

Translate join predicates into filters

Morning everyone, 
in our company we are running a POC using Apache Calcite, and we would like to collect some feedbacks from you for the scenario mentioned below.

There is a service API that allows retrieving some `Book`s, and we would like to build a table adapter on top of this service; this API 
only accepts a set of IDs, and if no IDs are provided, no result is returned.

```
  interface BookService {
    /** Returns the books matching the given IDs.
     *
     * <p>If not IDs is provided, no result is returned.
     */
    List<Book> findBooksByIds(Set<String> ids);
  }

  record Book(String id, String title) {};
```

A requirement of this table is that it has to support join, and we would like to support joining by ID in an efficient way.

The goal is to define a rule that forces the query planner to always push down join predicates into a table scan.

Given the following `book` table:

```java
/** A table which represents books, queryable only by their ID. */
final class BookTable extends AbstractTable implements FilterableTable {
  private final BookService service;

  BookTable(BookService service) {
    this.service = service;
  }

  @Override
  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    return new RelDataTypeFactory.Builder(typeFactory)
        .add("id", SqlTypeName.VARCHAR)
        .add("title", SqlTypeName.VARCHAR)
        .build();
  }

  @Override
  public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
    Set<String> bookIds = getBooksId(filters);
    List<Object[]> result = service.findBooksByIds(bookIds)
        .stream()
        .map(b -> new Object[]{b.id, b.title})
        .toList();

    return Linq4j.asEnumerable(result);
  }

  private static Set<String> getBooksId(List<RexNode> filters) {
    if (filters.size() != 1) {
      throw new IllegalArgumentException("Expected one filter to the ID, found: %d".formatted(filters.size()));
    }

    RexNode filter = filters.get(0);
    RexNode leftCondition = ((RexCall) filter).getOperands().get(0);
    RexNode rightCondition = ((RexCall) filter).getOperands().get(1);

    if (leftCondition instanceof RexInputRef left
        && rightCondition instanceof RexLiteral right
        // The index of the ID column is 1.
        && left.getIndex() == 1) {
      if (filter.isA(SqlKind.EQUALS)) {
        String bookId = right.getValue2().toString();
        return ImmutableSet.of(bookId);
      }
      if (filter.isA(SqlKind.SEARCH)) {
        @SuppressWarnings("unchecked")
        Sarg<NlsString> searchArguments = right.getValueAs(Sarg.class);
        return searchArguments.rangeSet.asRanges().stream()
            .map(Range::lowerEndpoint)
            .map(NlsString::getValue)
            .collect(toSet());
      }
    }
    throw new IllegalArgumentException("Unexpected operator, found: %s".formatted(filter.getKind()));
  }
}
```

The API of the `BookService` always expects a set of IDs, and in case of query like (assuming an entry in `book` matching the ID `'a'`):
```sql
WITH config (id, val) AS (
    VALUES ('a', 3), ('b', 5)
)
SELECT b.* FROM books b
INNER JOIN config ON b.id = config.id
WHERE config.val > 4
```

Calcite produces the following plan:
```
EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], title=[$t2])
  EnumerableMergeJoin(condition=[=($0, $1)], joinType=[inner])
    EnumerableSort(sort0=[$0], dir0=[ASC])
      EnumerableCalc(expr#0..1=[{inputs}], id=[$t0])
        EnumerableTableScan(table=[[books]])
    EnumerableSort(sort0=[$0], dir0=[ASC])
      EnumerableTableScan(table=[[books]])
```

This means Calcite performs a full table scan of the `book` table, and since the `RexNode` filters in the `scan` method are empty, no result is returned (in this example we are using a value statement scoped views, but ideally the solution we are looking for should be valid for other table).
Under some defined circustances, Postgres generates Nested-Loop query plan for join: it first selects the row of the table A matching a given condition, then iterates over the retrieved rows and performs a scan of table B looking for rows that match the join condition; and this seems something we would like to enforce it here.

In summary, we would like to implement a table which in case of JOIN is capable of loading the individual IDs matching the API of our service, rather than performing a full table scan.

Do you have any advises/feedback for us?

Thanks in advance.

Re: Translate join predicates into filters

Posted by Dan Zou <zo...@163.com>.
Hi Luca,
If I understand you correctly, what you are looking for is temporal table join. Flink has implemented this based on Calcite, maybe is a good reference. You could find more details in [1], [2], [3].
- [1] https://issues.apache.org/jira/browse/CALCITE-1912
- [2] https://lists.apache.org/thread/s8rx569p6tqbh8ybomodo5w3h2rbfvkr
- [3] https://issues.apache.org/jira/browse/FLINK-12269

Best,
Dan Zou   





> 2023年5月17日 23:41,Luca Marchi <lu...@teampicnic.com> 写道:
> 
> Morning everyone, 
> in our company we are running a POC using Apache Calcite, and we would like to collect some feedbacks from you for the scenario mentioned below.
> 
> There is a service API that allows retrieving some `Book`s, and we would like to build a table adapter on top of this service; this API 
> only accepts a set of IDs, and if no IDs are provided, no result is returned.
> 
> ```
>  interface BookService {
>    /** Returns the books matching the given IDs.
>     *
>     * <p>If not IDs is provided, no result is returned.
>     */
>    List<Book> findBooksByIds(Set<String> ids);
>  }
> 
>  record Book(String id, String title) {};
> ```
> 
> A requirement of this table is that it has to support join, and we would like to support joining by ID in an efficient way.
> 
> The goal is to define a rule that forces the query planner to always push down join predicates into a table scan.
> 
> Given the following `book` table:
> 
> ```java
> /** A table which represents books, queryable only by their ID. */
> final class BookTable extends AbstractTable implements FilterableTable {
>  private final BookService service;
> 
>  BookTable(BookService service) {
>    this.service = service;
>  }
> 
>  @Override
>  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
>    return new RelDataTypeFactory.Builder(typeFactory)
>        .add("id", SqlTypeName.VARCHAR)
>        .add("title", SqlTypeName.VARCHAR)
>        .build();
>  }
> 
>  @Override
>  public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
>    Set<String> bookIds = getBooksId(filters);
>    List<Object[]> result = service.findBooksByIds(bookIds)
>        .stream()
>        .map(b -> new Object[]{b.id, b.title})
>        .toList();
> 
>    return Linq4j.asEnumerable(result);
>  }
> 
>  private static Set<String> getBooksId(List<RexNode> filters) {
>    if (filters.size() != 1) {
>      throw new IllegalArgumentException("Expected one filter to the ID, found: %d".formatted(filters.size()));
>    }
> 
>    RexNode filter = filters.get(0);
>    RexNode leftCondition = ((RexCall) filter).getOperands().get(0);
>    RexNode rightCondition = ((RexCall) filter).getOperands().get(1);
> 
>    if (leftCondition instanceof RexInputRef left
>        && rightCondition instanceof RexLiteral right
>        // The index of the ID column is 1.
>        && left.getIndex() == 1) {
>      if (filter.isA(SqlKind.EQUALS)) {
>        String bookId = right.getValue2().toString();
>        return ImmutableSet.of(bookId);
>      }
>      if (filter.isA(SqlKind.SEARCH)) {
>        @SuppressWarnings("unchecked")
>        Sarg<NlsString> searchArguments = right.getValueAs(Sarg.class);
>        return searchArguments.rangeSet.asRanges().stream()
>            .map(Range::lowerEndpoint)
>            .map(NlsString::getValue)
>            .collect(toSet());
>      }
>    }
>    throw new IllegalArgumentException("Unexpected operator, found: %s".formatted(filter.getKind()));
>  }
> }
> ```
> 
> The API of the `BookService` always expects a set of IDs, and in case of query like (assuming an entry in `book` matching the ID `'a'`):
> ```sql
> WITH config (id, val) AS (
>    VALUES ('a', 3), ('b', 5)
> )
> SELECT b.* FROM books b
> INNER JOIN config ON b.id = config.id
> WHERE config.val > 4
> ```
> 
> Calcite produces the following plan:
> ```
> EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], title=[$t2])
>  EnumerableMergeJoin(condition=[=($0, $1)], joinType=[inner])
>    EnumerableSort(sort0=[$0], dir0=[ASC])
>      EnumerableCalc(expr#0..1=[{inputs}], id=[$t0])
>        EnumerableTableScan(table=[[books]])
>    EnumerableSort(sort0=[$0], dir0=[ASC])
>      EnumerableTableScan(table=[[books]])
> ```
> 
> This means Calcite performs a full table scan of the `book` table, and since the `RexNode` filters in the `scan` method are empty, no result is returned (in this example we are using a value statement scoped views, but ideally the solution we are looking for should be valid for other table).
> Under some defined circustances, Postgres generates Nested-Loop query plan for join: it first selects the row of the table A matching a given condition, then iterates over the retrieved rows and performs a scan of table B looking for rows that match the join condition; and this seems something we would like to enforce it here.
> 
> In summary, we would like to implement a table which in case of JOIN is capable of loading the individual IDs matching the API of our service, rather than performing a full table scan.
> 
> Do you have any advises/feedback for us?
> 
> Thanks in advance.


Re: Translate join predicates into filters

Posted by Julian Hyde <jh...@gmail.com>.
I expect that your table works if you put the filter in the WHERE clause, e.g.

  SELECT *
  FROM Books AS b
  WHERE b.id <http://b.id/> IN (1, 10, 27)

and it does so using FilterTableScanRule (which matches a Filter on top of a Scan of a FilterableTable). But you need a new planner rule that can convert a Join whose right input is a Scan of a FilterableTable into a NestedLoopsJoin that dynamically sets the filter for each row from the left. (In this query, config would be on the left, Books on the right.)

There could be a more efficient version that gathers all IDs from the left, then does one request to the right, and them something like a hash join.

Julian




> On May 17, 2023, at 8:41 AM, Luca Marchi <lu...@teampicnic.com> wrote:
> 
> Morning everyone, 
> in our company we are running a POC using Apache Calcite, and we would like to collect some feedbacks from you for the scenario mentioned below.
> 
> There is a service API that allows retrieving some `Book`s, and we would like to build a table adapter on top of this service; this API 
> only accepts a set of IDs, and if no IDs are provided, no result is returned.
> 
> ```
>  interface BookService {
>    /** Returns the books matching the given IDs.
>     *
>     * <p>If not IDs is provided, no result is returned.
>     */
>    List<Book> findBooksByIds(Set<String> ids);
>  }
> 
>  record Book(String id, String title) {};
> ```
> 
> A requirement of this table is that it has to support join, and we would like to support joining by ID in an efficient way.
> 
> The goal is to define a rule that forces the query planner to always push down join predicates into a table scan.
> 
> Given the following `book` table:
> 
> ```java
> /** A table which represents books, queryable only by their ID. */
> final class BookTable extends AbstractTable implements FilterableTable {
>  private final BookService service;
> 
>  BookTable(BookService service) {
>    this.service = service;
>  }
> 
>  @Override
>  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
>    return new RelDataTypeFactory.Builder(typeFactory)
>        .add("id", SqlTypeName.VARCHAR)
>        .add("title", SqlTypeName.VARCHAR)
>        .build();
>  }
> 
>  @Override
>  public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
>    Set<String> bookIds = getBooksId(filters);
>    List<Object[]> result = service.findBooksByIds(bookIds)
>        .stream()
>        .map(b -> new Object[]{b.id, b.title})
>        .toList();
> 
>    return Linq4j.asEnumerable(result);
>  }
> 
>  private static Set<String> getBooksId(List<RexNode> filters) {
>    if (filters.size() != 1) {
>      throw new IllegalArgumentException("Expected one filter to the ID, found: %d".formatted(filters.size()));
>    }
> 
>    RexNode filter = filters.get(0);
>    RexNode leftCondition = ((RexCall) filter).getOperands().get(0);
>    RexNode rightCondition = ((RexCall) filter).getOperands().get(1);
> 
>    if (leftCondition instanceof RexInputRef left
>        && rightCondition instanceof RexLiteral right
>        // The index of the ID column is 1.
>        && left.getIndex() == 1) {
>      if (filter.isA(SqlKind.EQUALS)) {
>        String bookId = right.getValue2().toString();
>        return ImmutableSet.of(bookId);
>      }
>      if (filter.isA(SqlKind.SEARCH)) {
>        @SuppressWarnings("unchecked")
>        Sarg<NlsString> searchArguments = right.getValueAs(Sarg.class);
>        return searchArguments.rangeSet.asRanges().stream()
>            .map(Range::lowerEndpoint)
>            .map(NlsString::getValue)
>            .collect(toSet());
>      }
>    }
>    throw new IllegalArgumentException("Unexpected operator, found: %s".formatted(filter.getKind()));
>  }
> }
> ```
> 
> The API of the `BookService` always expects a set of IDs, and in case of query like (assuming an entry in `book` matching the ID `'a'`):
> ```sql
> WITH config (id, val) AS (
>    VALUES ('a', 3), ('b', 5)
> )
> SELECT b.* FROM books b
> INNER JOIN config ON b.id = config.id
> WHERE config.val > 4
> ```
> 
> Calcite produces the following plan:
> ```
> EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], title=[$t2])
>  EnumerableMergeJoin(condition=[=($0, $1)], joinType=[inner])
>    EnumerableSort(sort0=[$0], dir0=[ASC])
>      EnumerableCalc(expr#0..1=[{inputs}], id=[$t0])
>        EnumerableTableScan(table=[[books]])
>    EnumerableSort(sort0=[$0], dir0=[ASC])
>      EnumerableTableScan(table=[[books]])
> ```
> 
> This means Calcite performs a full table scan of the `book` table, and since the `RexNode` filters in the `scan` method are empty, no result is returned (in this example we are using a value statement scoped views, but ideally the solution we are looking for should be valid for other table).
> Under some defined circustances, Postgres generates Nested-Loop query plan for join: it first selects the row of the table A matching a given condition, then iterates over the retrieved rows and performs a scan of table B looking for rows that match the join condition; and this seems something we would like to enforce it here.
> 
> In summary, we would like to implement a table which in case of JOIN is capable of loading the individual IDs matching the API of our service, rather than performing a full table scan.
> 
> Do you have any advises/feedback for us?
> 
> Thanks in advance.