You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Luca Marchi <lu...@teampicnic.com> on 2023/05/17 15:41:50 UTC
Translate join predicates into filters
Morning everyone,
in our company we are running a POC using Apache Calcite, and we would like to collect some feedbacks from you for the scenario mentioned below.
There is a service API that allows retrieving some `Book`s, and we would like to build a table adapter on top of this service; this API
only accepts a set of IDs, and if no IDs are provided, no result is returned.
```
interface BookService {
/** Returns the books matching the given IDs.
*
* <p>If not IDs is provided, no result is returned.
*/
List<Book> findBooksByIds(Set<String> ids);
}
record Book(String id, String title) {};
```
A requirement of this table is that it has to support join, and we would like to support joining by ID in an efficient way.
The goal is to define a rule that forces the query planner to always push down join predicates into a table scan.
Given the following `book` table:
```java
/** A table which represents books, queryable only by their ID. */
final class BookTable extends AbstractTable implements FilterableTable {
private final BookService service;
BookTable(BookService service) {
this.service = service;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return new RelDataTypeFactory.Builder(typeFactory)
.add("id", SqlTypeName.VARCHAR)
.add("title", SqlTypeName.VARCHAR)
.build();
}
@Override
public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
Set<String> bookIds = getBooksId(filters);
List<Object[]> result = service.findBooksByIds(bookIds)
.stream()
.map(b -> new Object[]{b.id, b.title})
.toList();
return Linq4j.asEnumerable(result);
}
private static Set<String> getBooksId(List<RexNode> filters) {
if (filters.size() != 1) {
throw new IllegalArgumentException("Expected one filter to the ID, found: %d".formatted(filters.size()));
}
RexNode filter = filters.get(0);
RexNode leftCondition = ((RexCall) filter).getOperands().get(0);
RexNode rightCondition = ((RexCall) filter).getOperands().get(1);
if (leftCondition instanceof RexInputRef left
&& rightCondition instanceof RexLiteral right
// The index of the ID column is 1.
&& left.getIndex() == 1) {
if (filter.isA(SqlKind.EQUALS)) {
String bookId = right.getValue2().toString();
return ImmutableSet.of(bookId);
}
if (filter.isA(SqlKind.SEARCH)) {
@SuppressWarnings("unchecked")
Sarg<NlsString> searchArguments = right.getValueAs(Sarg.class);
return searchArguments.rangeSet.asRanges().stream()
.map(Range::lowerEndpoint)
.map(NlsString::getValue)
.collect(toSet());
}
}
throw new IllegalArgumentException("Unexpected operator, found: %s".formatted(filter.getKind()));
}
}
```
The API of the `BookService` always expects a set of IDs, and in case of query like (assuming an entry in `book` matching the ID `'a'`):
```sql
WITH config (id, val) AS (
VALUES ('a', 3), ('b', 5)
)
SELECT b.* FROM books b
INNER JOIN config ON b.id = config.id
WHERE config.val > 4
```
Calcite produces the following plan:
```
EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], title=[$t2])
EnumerableMergeJoin(condition=[=($0, $1)], joinType=[inner])
EnumerableSort(sort0=[$0], dir0=[ASC])
EnumerableCalc(expr#0..1=[{inputs}], id=[$t0])
EnumerableTableScan(table=[[books]])
EnumerableSort(sort0=[$0], dir0=[ASC])
EnumerableTableScan(table=[[books]])
```
This means Calcite performs a full table scan of the `book` table, and since the `RexNode` filters in the `scan` method are empty, no result is returned (in this example we are using a value statement scoped views, but ideally the solution we are looking for should be valid for other table).
Under some defined circustances, Postgres generates Nested-Loop query plan for join: it first selects the row of the table A matching a given condition, then iterates over the retrieved rows and performs a scan of table B looking for rows that match the join condition; and this seems something we would like to enforce it here.
In summary, we would like to implement a table which in case of JOIN is capable of loading the individual IDs matching the API of our service, rather than performing a full table scan.
Do you have any advises/feedback for us?
Thanks in advance.
Re: Translate join predicates into filters
Posted by Dan Zou <zo...@163.com>.
Hi Luca,
If I understand you correctly, what you are looking for is temporal table join. Flink has implemented this based on Calcite, maybe is a good reference. You could find more details in [1], [2], [3].
- [1] https://issues.apache.org/jira/browse/CALCITE-1912
- [2] https://lists.apache.org/thread/s8rx569p6tqbh8ybomodo5w3h2rbfvkr
- [3] https://issues.apache.org/jira/browse/FLINK-12269
Best,
Dan Zou
> 2023年5月17日 23:41,Luca Marchi <lu...@teampicnic.com> 写道:
>
> Morning everyone,
> in our company we are running a POC using Apache Calcite, and we would like to collect some feedbacks from you for the scenario mentioned below.
>
> There is a service API that allows retrieving some `Book`s, and we would like to build a table adapter on top of this service; this API
> only accepts a set of IDs, and if no IDs are provided, no result is returned.
>
> ```
> interface BookService {
> /** Returns the books matching the given IDs.
> *
> * <p>If not IDs is provided, no result is returned.
> */
> List<Book> findBooksByIds(Set<String> ids);
> }
>
> record Book(String id, String title) {};
> ```
>
> A requirement of this table is that it has to support join, and we would like to support joining by ID in an efficient way.
>
> The goal is to define a rule that forces the query planner to always push down join predicates into a table scan.
>
> Given the following `book` table:
>
> ```java
> /** A table which represents books, queryable only by their ID. */
> final class BookTable extends AbstractTable implements FilterableTable {
> private final BookService service;
>
> BookTable(BookService service) {
> this.service = service;
> }
>
> @Override
> public RelDataType getRowType(RelDataTypeFactory typeFactory) {
> return new RelDataTypeFactory.Builder(typeFactory)
> .add("id", SqlTypeName.VARCHAR)
> .add("title", SqlTypeName.VARCHAR)
> .build();
> }
>
> @Override
> public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
> Set<String> bookIds = getBooksId(filters);
> List<Object[]> result = service.findBooksByIds(bookIds)
> .stream()
> .map(b -> new Object[]{b.id, b.title})
> .toList();
>
> return Linq4j.asEnumerable(result);
> }
>
> private static Set<String> getBooksId(List<RexNode> filters) {
> if (filters.size() != 1) {
> throw new IllegalArgumentException("Expected one filter to the ID, found: %d".formatted(filters.size()));
> }
>
> RexNode filter = filters.get(0);
> RexNode leftCondition = ((RexCall) filter).getOperands().get(0);
> RexNode rightCondition = ((RexCall) filter).getOperands().get(1);
>
> if (leftCondition instanceof RexInputRef left
> && rightCondition instanceof RexLiteral right
> // The index of the ID column is 1.
> && left.getIndex() == 1) {
> if (filter.isA(SqlKind.EQUALS)) {
> String bookId = right.getValue2().toString();
> return ImmutableSet.of(bookId);
> }
> if (filter.isA(SqlKind.SEARCH)) {
> @SuppressWarnings("unchecked")
> Sarg<NlsString> searchArguments = right.getValueAs(Sarg.class);
> return searchArguments.rangeSet.asRanges().stream()
> .map(Range::lowerEndpoint)
> .map(NlsString::getValue)
> .collect(toSet());
> }
> }
> throw new IllegalArgumentException("Unexpected operator, found: %s".formatted(filter.getKind()));
> }
> }
> ```
>
> The API of the `BookService` always expects a set of IDs, and in case of query like (assuming an entry in `book` matching the ID `'a'`):
> ```sql
> WITH config (id, val) AS (
> VALUES ('a', 3), ('b', 5)
> )
> SELECT b.* FROM books b
> INNER JOIN config ON b.id = config.id
> WHERE config.val > 4
> ```
>
> Calcite produces the following plan:
> ```
> EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], title=[$t2])
> EnumerableMergeJoin(condition=[=($0, $1)], joinType=[inner])
> EnumerableSort(sort0=[$0], dir0=[ASC])
> EnumerableCalc(expr#0..1=[{inputs}], id=[$t0])
> EnumerableTableScan(table=[[books]])
> EnumerableSort(sort0=[$0], dir0=[ASC])
> EnumerableTableScan(table=[[books]])
> ```
>
> This means Calcite performs a full table scan of the `book` table, and since the `RexNode` filters in the `scan` method are empty, no result is returned (in this example we are using a value statement scoped views, but ideally the solution we are looking for should be valid for other table).
> Under some defined circustances, Postgres generates Nested-Loop query plan for join: it first selects the row of the table A matching a given condition, then iterates over the retrieved rows and performs a scan of table B looking for rows that match the join condition; and this seems something we would like to enforce it here.
>
> In summary, we would like to implement a table which in case of JOIN is capable of loading the individual IDs matching the API of our service, rather than performing a full table scan.
>
> Do you have any advises/feedback for us?
>
> Thanks in advance.
Re: Translate join predicates into filters
Posted by Julian Hyde <jh...@gmail.com>.
I expect that your table works if you put the filter in the WHERE clause, e.g.
SELECT *
FROM Books AS b
WHERE b.id <http://b.id/> IN (1, 10, 27)
and it does so using FilterTableScanRule (which matches a Filter on top of a Scan of a FilterableTable). But you need a new planner rule that can convert a Join whose right input is a Scan of a FilterableTable into a NestedLoopsJoin that dynamically sets the filter for each row from the left. (In this query, config would be on the left, Books on the right.)
There could be a more efficient version that gathers all IDs from the left, then does one request to the right, and them something like a hash join.
Julian
> On May 17, 2023, at 8:41 AM, Luca Marchi <lu...@teampicnic.com> wrote:
>
> Morning everyone,
> in our company we are running a POC using Apache Calcite, and we would like to collect some feedbacks from you for the scenario mentioned below.
>
> There is a service API that allows retrieving some `Book`s, and we would like to build a table adapter on top of this service; this API
> only accepts a set of IDs, and if no IDs are provided, no result is returned.
>
> ```
> interface BookService {
> /** Returns the books matching the given IDs.
> *
> * <p>If not IDs is provided, no result is returned.
> */
> List<Book> findBooksByIds(Set<String> ids);
> }
>
> record Book(String id, String title) {};
> ```
>
> A requirement of this table is that it has to support join, and we would like to support joining by ID in an efficient way.
>
> The goal is to define a rule that forces the query planner to always push down join predicates into a table scan.
>
> Given the following `book` table:
>
> ```java
> /** A table which represents books, queryable only by their ID. */
> final class BookTable extends AbstractTable implements FilterableTable {
> private final BookService service;
>
> BookTable(BookService service) {
> this.service = service;
> }
>
> @Override
> public RelDataType getRowType(RelDataTypeFactory typeFactory) {
> return new RelDataTypeFactory.Builder(typeFactory)
> .add("id", SqlTypeName.VARCHAR)
> .add("title", SqlTypeName.VARCHAR)
> .build();
> }
>
> @Override
> public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
> Set<String> bookIds = getBooksId(filters);
> List<Object[]> result = service.findBooksByIds(bookIds)
> .stream()
> .map(b -> new Object[]{b.id, b.title})
> .toList();
>
> return Linq4j.asEnumerable(result);
> }
>
> private static Set<String> getBooksId(List<RexNode> filters) {
> if (filters.size() != 1) {
> throw new IllegalArgumentException("Expected one filter to the ID, found: %d".formatted(filters.size()));
> }
>
> RexNode filter = filters.get(0);
> RexNode leftCondition = ((RexCall) filter).getOperands().get(0);
> RexNode rightCondition = ((RexCall) filter).getOperands().get(1);
>
> if (leftCondition instanceof RexInputRef left
> && rightCondition instanceof RexLiteral right
> // The index of the ID column is 1.
> && left.getIndex() == 1) {
> if (filter.isA(SqlKind.EQUALS)) {
> String bookId = right.getValue2().toString();
> return ImmutableSet.of(bookId);
> }
> if (filter.isA(SqlKind.SEARCH)) {
> @SuppressWarnings("unchecked")
> Sarg<NlsString> searchArguments = right.getValueAs(Sarg.class);
> return searchArguments.rangeSet.asRanges().stream()
> .map(Range::lowerEndpoint)
> .map(NlsString::getValue)
> .collect(toSet());
> }
> }
> throw new IllegalArgumentException("Unexpected operator, found: %s".formatted(filter.getKind()));
> }
> }
> ```
>
> The API of the `BookService` always expects a set of IDs, and in case of query like (assuming an entry in `book` matching the ID `'a'`):
> ```sql
> WITH config (id, val) AS (
> VALUES ('a', 3), ('b', 5)
> )
> SELECT b.* FROM books b
> INNER JOIN config ON b.id = config.id
> WHERE config.val > 4
> ```
>
> Calcite produces the following plan:
> ```
> EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], title=[$t2])
> EnumerableMergeJoin(condition=[=($0, $1)], joinType=[inner])
> EnumerableSort(sort0=[$0], dir0=[ASC])
> EnumerableCalc(expr#0..1=[{inputs}], id=[$t0])
> EnumerableTableScan(table=[[books]])
> EnumerableSort(sort0=[$0], dir0=[ASC])
> EnumerableTableScan(table=[[books]])
> ```
>
> This means Calcite performs a full table scan of the `book` table, and since the `RexNode` filters in the `scan` method are empty, no result is returned (in this example we are using a value statement scoped views, but ideally the solution we are looking for should be valid for other table).
> Under some defined circustances, Postgres generates Nested-Loop query plan for join: it first selects the row of the table A matching a given condition, then iterates over the retrieved rows and performs a scan of table B looking for rows that match the join condition; and this seems something we would like to enforce it here.
>
> In summary, we would like to implement a table which in case of JOIN is capable of loading the individual IDs matching the API of our service, rather than performing a full table scan.
>
> Do you have any advises/feedback for us?
>
> Thanks in advance.