You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Justin Huang <yo...@nvidia.com.INVALID> on 2021/10/01 01:41:23 UTC

RE: Need help: exception when using Elasticsearch adapter

Hi Alessandro,

I am new to Calcite, and just started using Calcite recently, and wanted to understand how the logical to physical conversion works.

I wrote a SQL statement, but the Schema / Table class I specified in model json/yaml are not ElasticsearchSchema/ElasticsearchTable, you can assume they are MyOwnSchema/MyOwnTable. I firstly got the LogicalProject/Filter/TableScan nodes tree by parsing the SQL, then converted the nodes to EnumerableProject/Filter/TableScan nodes.

It's unclear to me whether I can convert the Enumerable RelNode I mentioned above to Elasticsearch RelNode or I have to use Elasticsearch/ElasticsearchTable in model json/yaml to parse the SQL statement? I assumed Calcite can convert any supported SQL to Elasticsearch DSL no matter what the Schema/Table specification of the SQL are.

Thanks,
Justin

From
Alessandro Solimando alessandro.solima...@gmail.com<ma...@gmail.com>
Subject
Re: Need help: exception when using Elasticsearch adapter
Date
Thu, 30 Sep 2021 06:14:05 GMT
Each adapter "speaks" the language of the supported data source, but only
to the extent of querying it, and for the tables stored in it.

If your input tables are not from ES I have a hard time to even understand
what you are trying to achieve in this way.

Consider also that a plan consists generally of operations that are not
always supported in all data sources, that's why when querying data the
adapter "knows" if a filter/join/etc. can be pushed-down/expressed in the
data source query language, if not they will be executed in the Enumerable
convention.

Can you elaborate a bit more on what you are trying to achieve?


From: Justin Huang
Sent: Thursday, September 30, 2021 8:02 AM
To: 'dev@calcite.apache.org' <de...@calcite.apache.org>
Subject: RE: Need help: exception when using Elasticsearch adapter

I can convert the Logical RelNode to Enumerable RelNode, and then I added another step to convert Enumerable RelNode to Elasticsearch RelNode:

def toEsPhysicalPlan(root):
    print(f"toEsPhysicalPlan root={root}")
    planner = root.getCluster().getPlanner()
    planner.clear()
    for rule in ElasticsearchRules.RULES:
        planner.addRule(rule)

    program = Programs.of(RuleSets.ofList(planner.getRules()))
    traits = root.getTraitSet().replace(ElasticsearchRel.CONVENTION)
return program.run(planner, root, traits, ImmutableList.of(), ImmutableList.of())

Now the program.run failed with below exception, what can be the reason? Is there any example that shows how to convert a SQL to Elasticsearch Json? BTW, my input schema/table is not of ElasticsearchSchema/Table type.

org.apache.calcite.plan.RelOptPlanner.CannotPlanException: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=ELASTICSEARCH, sort=[].
Missing conversion is EnumerableProject[convention: ENUMERABLE -> ELASTICSEARCH]
There is 1 empty subset: rel#67:RelSubset#3.ELASTICSEARCH.[], the relevant part of the original plan is as follows
65:EnumerableProject(BugId=[$0], ARB=[$1])
  60:EnumerableTableScan(subset=[rel#64:RelSubset#2.ENUMERABLE.[]], table=[[root, table_bugx]])

Root: rel#67:RelSubset#3.ELASTICSEARCH.[]
Original rel:
EnumerableProject(subset=[rel#58:RelSubset#1.ENUMERABLE.[]], BugId=[$0], ARB=[$1]): rowcount = 100.0, cumulative cost = {100.0 rows, 200.0 cpu, 0.0 io}, id = 62
  EnumerableTableScan(subset=[rel#61:RelSubset#0.ENUMERABLE.[]], table=[[root, table_bugx]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 60

Sets:
Set#2, type: RecordType(BIGINT BugId, VARCHAR ARB)
        rel#64:RelSubset#2.ENUMERABLE.[], best=rel#60
                rel#60:EnumerableTableScan.ENUMERABLE.[](table=[root, table_bugx]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io}
Set#3, type: RecordType(BIGINT BugId, VARCHAR ARB)
        rel#66:RelSubset#3.ENUMERABLE.[], best=rel#65
                rel#65:EnumerableProject.ENUMERABLE.[](input=RelSubset#64,inputs=0..1), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu, 0.0 io}
        rel#67:RelSubset#3.ELASTICSEARCH.[], best=null
                rel#68:AbstractConverter.ELASTICSEARCH.[](input=RelSubset#66,convention=ELASTICSEARCH,sort=[]), rowcount=100.0, cumulative cost={inf}

Graphviz:
digraph G {
        root [style=filled,label="Root"];
        subgraph cluster2{
                label="Set 2 RecordType(BIGINT BugId, VARCHAR ARB)";
                rel60 [label="rel#60:EnumerableTableScan\ntable=[root, table_bugx]\nrows=100.0, cost={100.0 rows, 101.0 cpu, 0.0 io}",color=blue,shape=box]
                subset64 [label="rel#64:RelSubset#2.ENUMERABLE.[]"]
        }
        subgraph cluster3{
                label="Set 3 RecordType(BIGINT BugId, VARCHAR ARB)";
                rel65 [label="rel#65:EnumerableProject\ninput=RelSubset#64,inputs=0..1\nrows=100.0, cost={200.0 rows, 301.0 cpu, 0.0 io}",color=blue,shape=box]
                rel68 [label="rel#68:AbstractConverter\ninput=RelSubset#66,convention=ELASTICSEARCH,sort=[]\nrows=100.0, cost={inf}",shape=box]
                subset66 [label="rel#66:RelSubset#3.ENUMERABLE.[]"]
                subset67 [label="rel#67:RelSubset#3.ELASTICSEARCH.[]",color=red]
        }
        root -> subset67;
        subset64 -> rel60[color=blue];
        subset66 -> rel65[color=blue]; rel65 -> subset64[color=blue];
        subset67 -> rel68; rel68 -> subset66;
}

Thanks,
Jusitn

From: Justin Huang
Sent: Monday, September 27, 2021 1:56 PM
To: dev@calcite.apache.org<ma...@calcite.apache.org>
Subject: Need help: exception when using Elasticsearch adapter

Hi Calcite developers,

I tried to convert a simple 'SELECT * FROM xxx' statement to Elasticsearch DSL using the Elasticsearch adapter.

The toPhysicalPlan(relNode) failed when running program.run. Can anyone shed some lights on this? Any requirements on the Table/Schema? The table I used in my class is a DaskTable <https://github.com/dask-contrib/dask-sql/blob/main/planner/src/main/java/com/dask/sql/schema/DaskTable.java>  (from dask-sql project) which implements the interface ProjectableFilterableTable.

Thanks,
Justin

I am using Calcite in Python using with Jpype module, and the function sql2dsl converts RelNode to Elasticsearch DSL:

def toPhysicalPlan(root):
    rules = (
                EnumerableRules.ENUMERABLE_PROJECT_RULE,
                EnumerableRules.ENUMERABLE_FILTER_RULE,
                EnumerableRules.ENUMERABLE_AGGREGATE_RULE,
                EnumerableRules.ENUMERABLE_SORT_RULE,
                EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE
            )

    planner = root.getCluster().getPlanner()
    planner.clear()
    for rule in rules:
        planner.addRule(rule)

    program = Programs.of(RuleSets.ofList(planner.getRules()))
    traits = root.getTraitSet().replace(EnumerableConvention.INSTANCE)
    return program.run(planner, root, traits, ImmutableList.of(), ImmutableList.of())

def toElasticsearchQuery(root):
    relImplementor = EnumerableRelImplementor(root.getCluster().getRexBuilder(), ImmutableMap.of())
    elasticsearchImplementor = ElasticsearchRel.Implementor()
    elasticsearchImplementor.visitChild(0, root)
    for x in elasticsearchImplementor.list:
        print(f"x: {x}")

def sql2dsl(relNode):
    esPhysicalPlan = toPhysicalPlan(relNode)
    esJson = toElasticsearchQuery(esPhysicalPlan)
    print(f"esJson={str(esJson)}")

Here are the error messages:

org.apache.calcite.plan.RelOptPlanner.CannotPlanException: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=ENUMERABLE, sort=[].
Missing conversion is LogicalTableScan[convention: NONE -> ENUMERABLE]
There is 1 empty subset: rel#51:RelSubset#0.ENUMERABLE.[], the relevant part of the original plan is as follows
47:LogicalTableScan(table=[[xxx]])

Root: rel#51:RelSubset#0.ENUMERABLE.[]
Original rel:
LogicalTableScan(subset=[rel#51:RelSubset#0.ENUMERABLE.[]], table=[[xxx]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 47

Sets:
Set#0, type: RecordType(VARCHAR name, BIGINT age, VARCHAR position, BIGINT BugId)
        rel#50:RelSubset#0.NONE.[], best=null
                rel#47:LogicalTableScan.NONE.[](table=[xxx]), rowcount=100.0, cumulative cost={inf}
        rel#51:RelSubset#0.ENUMERABLE.[], best=null
                rel#52:AbstractConverter.ENUMERABLE.[](input=RelSubset#50,convention=ENUMERABLE,sort=[]), rowcount=100.0, cumulative cost={inf}

Graphviz:
digraph G {
        root [style=filled,label="Root"];
        subgraph cluster0{
                label="Set 0 RecordType(VARCHAR name, BIGINT age, VARCHAR position, BIGINT BugId)";
                rel47 [label="rel#47:LogicalTableScan\ntable=[xxx]\nrows=100.0, cost={inf}",shape=box]
                rel52 [label="rel#52:AbstractConverter\ninput=RelSubset#50,convention=ENUMERABLE,sort=[]\nrows=100.0, cost={inf}",shape=box]
                subset50 [label="rel#50:RelSubset#0.NONE.[]"]
                subset51 [label="rel#51:RelSubset#0.ENUMERABLE.[]",color=red]
        }
        root -> subset51;
        subset50 -> rel47;
        subset51 -> rel52; rel52 -> subset50;
}

Re: Need help: exception when using Elasticsearch adapter

Posted by Alessandro Solimando <al...@gmail.com>.
Hi Justin,
I think that "Calcite can convert any supported SQL to Elasticsearch DSL",
is an overstatement.

One of Calcite's functionalities is to provide SQL access to heterogeneous
data sources.

In such a setting, subparts of the user query can be delegated to different
data sources (accessed via JDBC or via an ad-hoc adapter for those not
supporting it).

When I say "delegated", I mean that part of the query is translated to the
language of the underlying data store (e.g., CQL for Cassandra). The
support for a given data store can be pretty minimal (only table scans, for
instance for the CSV adapter), but it can also support filters or other
relational operators (unions, joins, etc.).

Even if the data source supports all the relational operators in a given
subtree, the choice to use it or not is cost-based, among alternative plans.

However, the basic requirement is that at least one of the base tables you
are trying to query is stored in that particular data source.

If you say that none of the base tables are not stored in ES, how can it be
used to process data that is somewhere else?

And again, even if an SQL to ES translation existed, how can this ES' DSL
can reply for data that ES is not storing?

I think I am missing something here, maybe you can elaborate a bit more on
your use case?

Best regards,
Alessandro

On Fri, 1 Oct 2021 at 06:15, Justin Huang <yo...@nvidia.com.invalid>
wrote:

> Hi Alessandro,
>
> I am new to Calcite, and just started using Calcite recently, and wanted
> to understand how the logical to physical conversion works.
>
> I wrote a SQL statement, but the Schema / Table class I specified in model
> json/yaml are not ElasticsearchSchema/ElasticsearchTable, you can assume
> they are MyOwnSchema/MyOwnTable. I firstly got the
> LogicalProject/Filter/TableScan nodes tree by parsing the SQL, then
> converted the nodes to EnumerableProject/Filter/TableScan nodes.
>
> It's unclear to me whether I can convert the Enumerable RelNode I
> mentioned above to Elasticsearch RelNode or I have to use
> Elasticsearch/ElasticsearchTable in model json/yaml to parse the SQL
> statement? I assumed Calcite can convert any supported SQL to Elasticsearch
> DSL no matter what the Schema/Table specification of the SQL are.
>
> Thanks,
> Justin
>
> From
> Alessandro Solimando alessandro.solima...@gmail.com<mailto:
> alessandro.solima...@gmail.com>
> Subject
> Re: Need help: exception when using Elasticsearch adapter
> Date
> Thu, 30 Sep 2021 06:14:05 GMT
> Each adapter "speaks" the language of the supported data source, but only
> to the extent of querying it, and for the tables stored in it.
>
> If your input tables are not from ES I have a hard time to even understand
> what you are trying to achieve in this way.
>
> Consider also that a plan consists generally of operations that are not
> always supported in all data sources, that's why when querying data the
> adapter "knows" if a filter/join/etc. can be pushed-down/expressed in the
> data source query language, if not they will be executed in the Enumerable
> convention.
>
> Can you elaborate a bit more on what you are trying to achieve?
>
>
> From: Justin Huang
> Sent: Thursday, September 30, 2021 8:02 AM
> To: 'dev@calcite.apache.org' <de...@calcite.apache.org>
> Subject: RE: Need help: exception when using Elasticsearch adapter
>
> I can convert the Logical RelNode to Enumerable RelNode, and then I added
> another step to convert Enumerable RelNode to Elasticsearch RelNode:
>
> def toEsPhysicalPlan(root):
>     print(f"toEsPhysicalPlan root={root}")
>     planner = root.getCluster().getPlanner()
>     planner.clear()
>     for rule in ElasticsearchRules.RULES:
>         planner.addRule(rule)
>
>     program = Programs.of(RuleSets.ofList(planner.getRules()))
>     traits = root.getTraitSet().replace(ElasticsearchRel.CONVENTION)
> return program.run(planner, root, traits, ImmutableList.of(),
> ImmutableList.of())
>
> Now the program.run failed with below exception, what can be the reason?
> Is there any example that shows how to convert a SQL to Elasticsearch Json?
> BTW, my input schema/table is not of ElasticsearchSchema/Table type.
>
> org.apache.calcite.plan.RelOptPlanner.CannotPlanException:
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties:
> convention=ELASTICSEARCH, sort=[].
> Missing conversion is EnumerableProject[convention: ENUMERABLE ->
> ELASTICSEARCH]
> There is 1 empty subset: rel#67:RelSubset#3.ELASTICSEARCH.[], the relevant
> part of the original plan is as follows
> 65:EnumerableProject(BugId=[$0], ARB=[$1])
>   60:EnumerableTableScan(subset=[rel#64:RelSubset#2.ENUMERABLE.[]],
> table=[[root, table_bugx]])
>
> Root: rel#67:RelSubset#3.ELASTICSEARCH.[]
> Original rel:
> EnumerableProject(subset=[rel#58:RelSubset#1.ENUMERABLE.[]], BugId=[$0],
> ARB=[$1]): rowcount = 100.0, cumulative cost = {100.0 rows, 200.0 cpu, 0.0
> io}, id = 62
>   EnumerableTableScan(subset=[rel#61:RelSubset#0.ENUMERABLE.[]],
> table=[[root, table_bugx]]): rowcount = 100.0, cumulative cost = {100.0
> rows, 101.0 cpu, 0.0 io}, id = 60
>
> Sets:
> Set#2, type: RecordType(BIGINT BugId, VARCHAR ARB)
>         rel#64:RelSubset#2.ENUMERABLE.[], best=rel#60
>                 rel#60:EnumerableTableScan.ENUMERABLE.[](table=[root,
> table_bugx]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0
> io}
> Set#3, type: RecordType(BIGINT BugId, VARCHAR ARB)
>         rel#66:RelSubset#3.ENUMERABLE.[], best=rel#65
>
> rel#65:EnumerableProject.ENUMERABLE.[](input=RelSubset#64,inputs=0..1),
> rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu, 0.0 io}
>         rel#67:RelSubset#3.ELASTICSEARCH.[], best=null
>
> rel#68:AbstractConverter.ELASTICSEARCH.[](input=RelSubset#66,convention=ELASTICSEARCH,sort=[]),
> rowcount=100.0, cumulative cost={inf}
>
> Graphviz:
> digraph G {
>         root [style=filled,label="Root"];
>         subgraph cluster2{
>                 label="Set 2 RecordType(BIGINT BugId, VARCHAR ARB)";
>                 rel60 [label="rel#60:EnumerableTableScan\ntable=[root,
> table_bugx]\nrows=100.0, cost={100.0 rows, 101.0 cpu, 0.0
> io}",color=blue,shape=box]
>                 subset64 [label="rel#64:RelSubset#2.ENUMERABLE.[]"]
>         }
>         subgraph cluster3{
>                 label="Set 3 RecordType(BIGINT BugId, VARCHAR ARB)";
>                 rel65
> [label="rel#65:EnumerableProject\ninput=RelSubset#64,inputs=0..1\nrows=100.0,
> cost={200.0 rows, 301.0 cpu, 0.0 io}",color=blue,shape=box]
>                 rel68
> [label="rel#68:AbstractConverter\ninput=RelSubset#66,convention=ELASTICSEARCH,sort=[]\nrows=100.0,
> cost={inf}",shape=box]
>                 subset66 [label="rel#66:RelSubset#3.ENUMERABLE.[]"]
>                 subset67
> [label="rel#67:RelSubset#3.ELASTICSEARCH.[]",color=red]
>         }
>         root -> subset67;
>         subset64 -> rel60[color=blue];
>         subset66 -> rel65[color=blue]; rel65 -> subset64[color=blue];
>         subset67 -> rel68; rel68 -> subset66;
> }
>
> Thanks,
> Jusitn
>
> From: Justin Huang
> Sent: Monday, September 27, 2021 1:56 PM
> To: dev@calcite.apache.org<ma...@calcite.apache.org>
> Subject: Need help: exception when using Elasticsearch adapter
>
> Hi Calcite developers,
>
> I tried to convert a simple 'SELECT * FROM xxx' statement to Elasticsearch
> DSL using the Elasticsearch adapter.
>
> The toPhysicalPlan(relNode) failed when running program.run. Can anyone
> shed some lights on this? Any requirements on the Table/Schema? The table I
> used in my class is a DaskTable <
> https://github.com/dask-contrib/dask-sql/blob/main/planner/src/main/java/com/dask/sql/schema/DaskTable.java>
> (from dask-sql project) which implements the interface
> ProjectableFilterableTable.
>
> Thanks,
> Justin
>
> I am using Calcite in Python using with Jpype module, and the function
> sql2dsl converts RelNode to Elasticsearch DSL:
>
> def toPhysicalPlan(root):
>     rules = (
>                 EnumerableRules.ENUMERABLE_PROJECT_RULE,
>                 EnumerableRules.ENUMERABLE_FILTER_RULE,
>                 EnumerableRules.ENUMERABLE_AGGREGATE_RULE,
>                 EnumerableRules.ENUMERABLE_SORT_RULE,
>                 EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE
>             )
>
>     planner = root.getCluster().getPlanner()
>     planner.clear()
>     for rule in rules:
>         planner.addRule(rule)
>
>     program = Programs.of(RuleSets.ofList(planner.getRules()))
>     traits = root.getTraitSet().replace(EnumerableConvention.INSTANCE)
>     return program.run(planner, root, traits, ImmutableList.of(),
> ImmutableList.of())
>
> def toElasticsearchQuery(root):
>     relImplementor =
> EnumerableRelImplementor(root.getCluster().getRexBuilder(),
> ImmutableMap.of())
>     elasticsearchImplementor = ElasticsearchRel.Implementor()
>     elasticsearchImplementor.visitChild(0, root)
>     for x in elasticsearchImplementor.list:
>         print(f"x: {x}")
>
> def sql2dsl(relNode):
>     esPhysicalPlan = toPhysicalPlan(relNode)
>     esJson = toElasticsearchQuery(esPhysicalPlan)
>     print(f"esJson={str(esJson)}")
>
> Here are the error messages:
>
> org.apache.calcite.plan.RelOptPlanner.CannotPlanException:
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties:
> convention=ENUMERABLE, sort=[].
> Missing conversion is LogicalTableScan[convention: NONE -> ENUMERABLE]
> There is 1 empty subset: rel#51:RelSubset#0.ENUMERABLE.[], the relevant
> part of the original plan is as follows
> 47:LogicalTableScan(table=[[xxx]])
>
> Root: rel#51:RelSubset#0.ENUMERABLE.[]
> Original rel:
> LogicalTableScan(subset=[rel#51:RelSubset#0.ENUMERABLE.[]],
> table=[[xxx]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu,
> 0.0 io}, id = 47
>
> Sets:
> Set#0, type: RecordType(VARCHAR name, BIGINT age, VARCHAR position, BIGINT
> BugId)
>         rel#50:RelSubset#0.NONE.[], best=null
>                 rel#47:LogicalTableScan.NONE.[](table=[xxx]),
> rowcount=100.0, cumulative cost={inf}
>         rel#51:RelSubset#0.ENUMERABLE.[], best=null
>
> rel#52:AbstractConverter.ENUMERABLE.[](input=RelSubset#50,convention=ENUMERABLE,sort=[]),
> rowcount=100.0, cumulative cost={inf}
>
> Graphviz:
> digraph G {
>         root [style=filled,label="Root"];
>         subgraph cluster0{
>                 label="Set 0 RecordType(VARCHAR name, BIGINT age, VARCHAR
> position, BIGINT BugId)";
>                 rel47
> [label="rel#47:LogicalTableScan\ntable=[xxx]\nrows=100.0,
> cost={inf}",shape=box]
>                 rel52
> [label="rel#52:AbstractConverter\ninput=RelSubset#50,convention=ENUMERABLE,sort=[]\nrows=100.0,
> cost={inf}",shape=box]
>                 subset50 [label="rel#50:RelSubset#0.NONE.[]"]
>                 subset51
> [label="rel#51:RelSubset#0.ENUMERABLE.[]",color=red]
>         }
>         root -> subset51;
>         subset50 -> rel47;
>         subset51 -> rel52; rel52 -> subset50;
> }
>