You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Simon Su <ba...@163.com> on 2019/08/12 07:37:46 UTC

Flink cannot recognized catalog set by registerCatalog.

Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the 
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more
   
It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon


Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Simon,

First of all for more thorough discussion you might want to have a look
at this thread:
https://lists.apache.org/thread.html/b450df1a7bf10187301820e529cbc223ce63f233c1af0f0c7415e62b@%3Cdev.flink.apache.org%3E

TL;DR; All objects registered with registerTable/registerTableSource are
temporary objects that do not have serializable form and therefore can
only be stored in an in-memory catalog. The useCatalog/useDatabase are
experimental APIs in the upcoming 1.9 release.

If you want to be sure that tables are stored in a given catalog you can
either register it directly via tEnv.getCatalog().createTable() or you
can try using SQL DDL.

Best,

Dawid

On 12/08/2019 09:37, Simon Su wrote:
> Hi All
>     I want to use a custom catalog by setting the name “ca1” and
> create a database under this catalog. When I submit the 
> SQL, and it raises the error like :
>
>     Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 1, column 98 to line 1, column 116: Object 'orderstream' not
> found within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not
> found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
> Object 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>    
> It seems that Calcite cannot find the source object as expected, After
> I debug the code I found that when using tableEnv.registerTableSource
> or registerTableSink, It will use a build-in catalog with a hard-code
> catalog name ( default-catalog ) and database name ( default_database
> ) while tableEnv.registerCatalog here cannot change this behaviros, So
> is this a reasonable behaviors ? If I don’t want to use default
> build-in catalog and database, is there any other ways to do this ?
>
>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !! tableEnv.useCatalog(catalog.getName()); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true); tableEnv.useDatabase("db1"); tableEnv.connect(sourceKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSource("orderstream"); tableEnv.connect(sinkKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSink("sinkstream");; String sql = "insert into ca1.db1.sinkstream " + "select tumble_start(ts,
> INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
> "group by tumble(ts, INTERVAL '5' SECOND), data"; tableEnv.sqlUpdate(sql); tableEnv.execute("test");
>
> Thanks,
> SImon
>

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Simon Su <ba...@163.com>.
OK, Thanks Jark


Thanks,
SImon


On 08/13/2019 14:05,Jark Wu<im...@gmail.com> wrote:
Hi Simon,


This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461.


Regards,
Jark


On Tue, 13 Aug 2019 at 13:57, Simon Su <ba...@163.com> wrote:

Hi Jark


Thanks for your reply. 


It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.


Thanks,
SImon


On 08/13/2019 12:27,Jark Wu<im...@gmail.com> wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 


"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."


But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 


Regards,
Jark


On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:

Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Simon Su <ba...@163.com>.
OK, Thanks Jark


Thanks,
SImon


On 08/13/2019 14:05,Jark Wu<im...@gmail.com> wrote:
Hi Simon,


This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461.


Regards,
Jark


On Tue, 13 Aug 2019 at 13:57, Simon Su <ba...@163.com> wrote:

Hi Jark


Thanks for your reply. 


It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.


Thanks,
SImon


On 08/13/2019 12:27,Jark Wu<im...@gmail.com> wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 


"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."


But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 


Regards,
Jark


On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:

Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Jark Wu <im...@gmail.com>.
Hi Simon,

This is a temporary workaround for 1.9 release. We will fix the behavior in
1.10, see FLINK-13461.

Regards,
Jark

On Tue, 13 Aug 2019 at 13:57, Simon Su <ba...@163.com> wrote:

> Hi Jark
>
> Thanks for your reply.
>
> It’s weird that In this case the tableEnv provide the api called
> “registerCatalog”, but it does not work in some cases ( like my cases ).
> Do you think it’s feasible to unify this behaviors ? I think the document
> is necessary, but a unify way to use tableEnv is also important.
>
> Thanks,
> SImon
>
> On 08/13/2019 12:27,Jark Wu<im...@gmail.com> <im...@gmail.com> wrote:
>
> I think we might need to improve the javadoc of
> tableEnv.registerTableSource/registerTableSink.
> Currently, the comment says
>
> "Registers an external TableSink with already configured field names and
> field types in this TableEnvironment's catalog."
>
> But, what catalog? The current one or default in-memory one?
> I think, it would be better to improve the description and add a NOTE on
> it.
>
> Regards,
> Jark
>
> On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:
>
>> Yes, tableEnv.registerTable(_) etc always registers in the default
>> catalog.
>> To create table in your custom catalog, you could use
>> tableEnv.sqlUpdate("create table ....").
>>
>> Thanks,
>> Xuefu
>>
>> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:
>>
>> > Hi Xuefu
>> >
>> > Thanks for you reply.
>> >
>> > Actually I have tried it as your advises. I have tried to call
>> > tableEnv.useCatalog and useDatabase. Also I have tried to use
>> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
>> > that when I call tableEnv.registerTableSource, it’s always use a
>> “build-in”
>> > Catalog and Database rather than the custom one. So if I want to use a
>> > custom one, I have to write code like this:
>> >
>> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>> >     EnvironmentSettings.newInstance()
>> >         .useBlinkPlanner()
>> >         .inStreamingMode()
>> >         .withBuiltInCatalogName("ca1")
>> >         .withBuiltInDatabaseName("db1")
>> >         .build());
>> >
>> >
>> > As Dawid said, if I want to store in my custom catalog, I can call
>> > catalog.createTable or using DDL.
>> >
>> > Thanks,
>> > SImon
>> >
>> > On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com>
>> wrote:
>> >
>> > Hi Simon,
>> >
>> > Thanks for reporting the problem. There is some rough edges around
>> catalog
>> > API and table environments, and we are improving post 1.9 release.
>> >
>> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
>> > Flink's CatalogManager, It doens't change the default catalog/database
>> as
>> > you expected. To switch to your newly registered catalog, you could call
>> > tableEnv.useCatalog() and .useDatabase().
>> >
>> > As an alternative, you could fully qualify your table name with a
>> > "catalog.db.table" syntax without switching current catalog/database.
>> >
>> > Please try those and let me know if you find new problems.
>> >
>> > Thanks,
>> > Xuefu
>> >
>> >
>> >
>> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>> >
>> >> Hi All
>> >>     I want to use a custom catalog by setting the name “ca1” and
>> create a
>> >> database under this catalog. When I submit the
>> >> SQL, and it raises the error like :
>> >>
>> >>
>> >>     Exception in thread "main"
>> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
>> From
>> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> >> within 'ca1.db1'
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>> line
>> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within
>> >> 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> >> ... 7 more
>> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
>> Object
>> >> 'orderstream' not found within 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> >> ... 26 more
>> >>
>> >> It seems that Calcite cannot find the source object as expected, After
>> I
>> >> debug the code I found that when using tableEnv.registerTableSource or
>> >> registerTableSink, It will use a build-in catalog with a hard-code
>> catalog
>> >> name ( default-catalog ) and database name ( default_database ) while
>> >> tableEnv.registerCatalog here cannot change this behaviros, So is this
>> a
>> >> reasonable behaviors ? If I don’t want to use default build-in catalog
>> and
>> >> database, is there any other ways to do this ?
>> >>
>> >>
>> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> >> change build-in catalog !!
>> >> tableEnv.useCatalog(catalog.getName());
>> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> >> "comment"), true);
>> >> tableEnv.useDatabase("db1");
>> >>
>> >> tableEnv.connect(sourceKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSource("orderstream");
>> >>
>> >> tableEnv.connect(sinkKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSink("sinkstream");;
>> >>
>> >> String sql = "insert into ca1.db1.sinkstream " +
>> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> >> ca1.db1.orderstream " +
>> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
>> >>
>> >> tableEnv.sqlUpdate(sql);
>> >> tableEnv.execute("test");
>> >>
>> >>
>> >> Thanks,
>> >> SImon
>> >>
>> >>
>> >
>> > --
>> > Xuefu Zhang
>> >
>> > "In Honey We Trust!"
>> >
>> >
>>
>> --
>> Xuefu Zhang
>>
>> "In Honey We Trust!"
>>
>

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Jark Wu <im...@gmail.com>.
Hi Simon,

This is a temporary workaround for 1.9 release. We will fix the behavior in
1.10, see FLINK-13461.

Regards,
Jark

On Tue, 13 Aug 2019 at 13:57, Simon Su <ba...@163.com> wrote:

> Hi Jark
>
> Thanks for your reply.
>
> It’s weird that In this case the tableEnv provide the api called
> “registerCatalog”, but it does not work in some cases ( like my cases ).
> Do you think it’s feasible to unify this behaviors ? I think the document
> is necessary, but a unify way to use tableEnv is also important.
>
> Thanks,
> SImon
>
> On 08/13/2019 12:27,Jark Wu<im...@gmail.com> <im...@gmail.com> wrote:
>
> I think we might need to improve the javadoc of
> tableEnv.registerTableSource/registerTableSink.
> Currently, the comment says
>
> "Registers an external TableSink with already configured field names and
> field types in this TableEnvironment's catalog."
>
> But, what catalog? The current one or default in-memory one?
> I think, it would be better to improve the description and add a NOTE on
> it.
>
> Regards,
> Jark
>
> On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:
>
>> Yes, tableEnv.registerTable(_) etc always registers in the default
>> catalog.
>> To create table in your custom catalog, you could use
>> tableEnv.sqlUpdate("create table ....").
>>
>> Thanks,
>> Xuefu
>>
>> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:
>>
>> > Hi Xuefu
>> >
>> > Thanks for you reply.
>> >
>> > Actually I have tried it as your advises. I have tried to call
>> > tableEnv.useCatalog and useDatabase. Also I have tried to use
>> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
>> > that when I call tableEnv.registerTableSource, it’s always use a
>> “build-in”
>> > Catalog and Database rather than the custom one. So if I want to use a
>> > custom one, I have to write code like this:
>> >
>> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>> >     EnvironmentSettings.newInstance()
>> >         .useBlinkPlanner()
>> >         .inStreamingMode()
>> >         .withBuiltInCatalogName("ca1")
>> >         .withBuiltInDatabaseName("db1")
>> >         .build());
>> >
>> >
>> > As Dawid said, if I want to store in my custom catalog, I can call
>> > catalog.createTable or using DDL.
>> >
>> > Thanks,
>> > SImon
>> >
>> > On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com>
>> wrote:
>> >
>> > Hi Simon,
>> >
>> > Thanks for reporting the problem. There is some rough edges around
>> catalog
>> > API and table environments, and we are improving post 1.9 release.
>> >
>> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
>> > Flink's CatalogManager, It doens't change the default catalog/database
>> as
>> > you expected. To switch to your newly registered catalog, you could call
>> > tableEnv.useCatalog() and .useDatabase().
>> >
>> > As an alternative, you could fully qualify your table name with a
>> > "catalog.db.table" syntax without switching current catalog/database.
>> >
>> > Please try those and let me know if you find new problems.
>> >
>> > Thanks,
>> > Xuefu
>> >
>> >
>> >
>> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>> >
>> >> Hi All
>> >>     I want to use a custom catalog by setting the name “ca1” and
>> create a
>> >> database under this catalog. When I submit the
>> >> SQL, and it raises the error like :
>> >>
>> >>
>> >>     Exception in thread "main"
>> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
>> From
>> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> >> within 'ca1.db1'
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>> line
>> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within
>> >> 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> >> ... 7 more
>> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
>> Object
>> >> 'orderstream' not found within 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> >> ... 26 more
>> >>
>> >> It seems that Calcite cannot find the source object as expected, After
>> I
>> >> debug the code I found that when using tableEnv.registerTableSource or
>> >> registerTableSink, It will use a build-in catalog with a hard-code
>> catalog
>> >> name ( default-catalog ) and database name ( default_database ) while
>> >> tableEnv.registerCatalog here cannot change this behaviros, So is this
>> a
>> >> reasonable behaviors ? If I don’t want to use default build-in catalog
>> and
>> >> database, is there any other ways to do this ?
>> >>
>> >>
>> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> >> change build-in catalog !!
>> >> tableEnv.useCatalog(catalog.getName());
>> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> >> "comment"), true);
>> >> tableEnv.useDatabase("db1");
>> >>
>> >> tableEnv.connect(sourceKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSource("orderstream");
>> >>
>> >> tableEnv.connect(sinkKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSink("sinkstream");;
>> >>
>> >> String sql = "insert into ca1.db1.sinkstream " +
>> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> >> ca1.db1.orderstream " +
>> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
>> >>
>> >> tableEnv.sqlUpdate(sql);
>> >> tableEnv.execute("test");
>> >>
>> >>
>> >> Thanks,
>> >> SImon
>> >>
>> >>
>> >
>> > --
>> > Xuefu Zhang
>> >
>> > "In Honey We Trust!"
>> >
>> >
>>
>> --
>> Xuefu Zhang
>>
>> "In Honey We Trust!"
>>
>

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Simon Su <ba...@163.com>.
Hi Jark


Thanks for your reply. 


It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.


Thanks,
SImon


On 08/13/2019 12:27,Jark Wu<im...@gmail.com> wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 


"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."


But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 


Regards,
Jark


On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:

Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Simon Su <ba...@163.com>.
Hi Jark


Thanks for your reply. 


It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.


Thanks,
SImon


On 08/13/2019 12:27,Jark Wu<im...@gmail.com> wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 


"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."


But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 


Regards,
Jark


On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:

Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Jark Wu <im...@gmail.com>.
I think we might need to improve the javadoc of
tableEnv.registerTableSource/registerTableSink.
Currently, the comment says

"Registers an external TableSink with already configured field names and
field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one?
I think, it would be better to improve the description and add a NOTE on
it.

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:

> Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
> To create table in your custom catalog, you could use
> tableEnv.sqlUpdate("create table ....").
>
> Thanks,
> Xuefu
>
> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:
>
> > Hi Xuefu
> >
> > Thanks for you reply.
> >
> > Actually I have tried it as your advises. I have tried to call
> > tableEnv.useCatalog and useDatabase. Also I have tried to use
> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
> > that when I call tableEnv.registerTableSource, it’s always use a
> “build-in”
> > Catalog and Database rather than the custom one. So if I want to use a
> > custom one, I have to write code like this:
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> >     EnvironmentSettings.newInstance()
> >         .useBlinkPlanner()
> >         .inStreamingMode()
> >         .withBuiltInCatalogName("ca1")
> >         .withBuiltInDatabaseName("db1")
> >         .build());
> >
> >
> > As Dawid said, if I want to store in my custom catalog, I can call
> > catalog.createTable or using DDL.
> >
> > Thanks,
> > SImon
> >
> > On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com>
> wrote:
> >
> > Hi Simon,
> >
> > Thanks for reporting the problem. There is some rough edges around
> catalog
> > API and table environments, and we are improving post 1.9 release.
> >
> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> > Flink's CatalogManager, It doens't change the default catalog/database as
> > you expected. To switch to your newly registered catalog, you could call
> > tableEnv.useCatalog() and .useDatabase().
> >
> > As an alternative, you could fully qualify your table name with a
> > "catalog.db.table" syntax without switching current catalog/database.
> >
> > Please try those and let me know if you find new problems.
> >
> > Thanks,
> > Xuefu
> >
> >
> >
> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
> >
> >> Hi All
> >>     I want to use a custom catalog by setting the name “ca1” and create
> a
> >> database under this catalog. When I submit the
> >> SQL, and it raises the error like :
> >>
> >>
> >>     Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From
> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> >> within 'ca1.db1'
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within
> >> 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> >> at
> >>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> >> ... 7 more
> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> >> 'orderstream' not found within 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> >> ... 26 more
> >>
> >> It seems that Calcite cannot find the source object as expected, After I
> >> debug the code I found that when using tableEnv.registerTableSource or
> >> registerTableSink, It will use a build-in catalog with a hard-code
> catalog
> >> name ( default-catalog ) and database name ( default_database ) while
> >> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> >> reasonable behaviors ? If I don’t want to use default build-in catalog
> and
> >> database, is there any other ways to do this ?
> >>
> >>
> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> >> change build-in catalog !!
> >> tableEnv.useCatalog(catalog.getName());
> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> >> "comment"), true);
> >> tableEnv.useDatabase("db1");
> >>
> >> tableEnv.connect(sourceKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSource("orderstream");
> >>
> >> tableEnv.connect(sinkKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSink("sinkstream");;
> >>
> >> String sql = "insert into ca1.db1.sinkstream " +
> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> >> ca1.db1.orderstream " +
> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
> >>
> >> tableEnv.sqlUpdate(sql);
> >> tableEnv.execute("test");
> >>
> >>
> >> Thanks,
> >> SImon
> >>
> >>
> >
> > --
> > Xuefu Zhang
> >
> > "In Honey We Trust!"
> >
> >
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Jark Wu <im...@gmail.com>.
I think we might need to improve the javadoc of
tableEnv.registerTableSource/registerTableSink.
Currently, the comment says

"Registers an external TableSink with already configured field names and
field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one?
I think, it would be better to improve the description and add a NOTE on
it.

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <us...@gmail.com> wrote:

> Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
> To create table in your custom catalog, you could use
> tableEnv.sqlUpdate("create table ....").
>
> Thanks,
> Xuefu
>
> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:
>
> > Hi Xuefu
> >
> > Thanks for you reply.
> >
> > Actually I have tried it as your advises. I have tried to call
> > tableEnv.useCatalog and useDatabase. Also I have tried to use
> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
> > that when I call tableEnv.registerTableSource, it’s always use a
> “build-in”
> > Catalog and Database rather than the custom one. So if I want to use a
> > custom one, I have to write code like this:
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> >     EnvironmentSettings.newInstance()
> >         .useBlinkPlanner()
> >         .inStreamingMode()
> >         .withBuiltInCatalogName("ca1")
> >         .withBuiltInDatabaseName("db1")
> >         .build());
> >
> >
> > As Dawid said, if I want to store in my custom catalog, I can call
> > catalog.createTable or using DDL.
> >
> > Thanks,
> > SImon
> >
> > On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com>
> wrote:
> >
> > Hi Simon,
> >
> > Thanks for reporting the problem. There is some rough edges around
> catalog
> > API and table environments, and we are improving post 1.9 release.
> >
> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> > Flink's CatalogManager, It doens't change the default catalog/database as
> > you expected. To switch to your newly registered catalog, you could call
> > tableEnv.useCatalog() and .useDatabase().
> >
> > As an alternative, you could fully qualify your table name with a
> > "catalog.db.table" syntax without switching current catalog/database.
> >
> > Please try those and let me know if you find new problems.
> >
> > Thanks,
> > Xuefu
> >
> >
> >
> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
> >
> >> Hi All
> >>     I want to use a custom catalog by setting the name “ca1” and create
> a
> >> database under this catalog. When I submit the
> >> SQL, and it raises the error like :
> >>
> >>
> >>     Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From
> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> >> within 'ca1.db1'
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within
> >> 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> >> at
> >>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> >> ... 7 more
> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> >> 'orderstream' not found within 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> >> ... 26 more
> >>
> >> It seems that Calcite cannot find the source object as expected, After I
> >> debug the code I found that when using tableEnv.registerTableSource or
> >> registerTableSink, It will use a build-in catalog with a hard-code
> catalog
> >> name ( default-catalog ) and database name ( default_database ) while
> >> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> >> reasonable behaviors ? If I don’t want to use default build-in catalog
> and
> >> database, is there any other ways to do this ?
> >>
> >>
> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> >> change build-in catalog !!
> >> tableEnv.useCatalog(catalog.getName());
> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> >> "comment"), true);
> >> tableEnv.useDatabase("db1");
> >>
> >> tableEnv.connect(sourceKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSource("orderstream");
> >>
> >> tableEnv.connect(sinkKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSink("sinkstream");;
> >>
> >> String sql = "insert into ca1.db1.sinkstream " +
> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> >> ca1.db1.orderstream " +
> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
> >>
> >> tableEnv.sqlUpdate(sql);
> >> tableEnv.execute("test");
> >>
> >>
> >> Thanks,
> >> SImon
> >>
> >>
> >
> > --
> > Xuefu Zhang
> >
> > "In Honey We Trust!"
> >
> >
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Xuefu Z <us...@gmail.com>.
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

-- 
Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Xuefu Z <us...@gmail.com>.
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <ba...@163.com> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> <us...@gmail.com> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

-- 
Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Simon Su <ba...@163.com>.
Hi Xuefu


Thanks for you reply. 


Actually I have tried it as your advises. I have tried to call tableEnv.useCatalog and useDatabase. Also I have tried to use “catalogname.databasename.tableName”  in SQL. I think the root cause is that when I call tableEnv.registerTableSource, it’s always use a “build-in”
Catalog and Database rather than the custom one. So if I want to use a custom one, I have to write code like this:


StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("ca1")
.withBuiltInDatabaseName("db1")
.build());


As Dawid said, if I want to store in my custom catalog, I can call catalog.createTable or using DDL.


Thanks,
SImon


On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> wrote:
Hi Simon,


Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release.


Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase().


As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database.


Please try those and let me know if you find new problems.


Thanks,
Xuefu







On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:

Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more

It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon




--

Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Simon Su <ba...@163.com>.
Hi Xuefu


Thanks for you reply. 


Actually I have tried it as your advises. I have tried to call tableEnv.useCatalog and useDatabase. Also I have tried to use “catalogname.databasename.tableName”  in SQL. I think the root cause is that when I call tableEnv.registerTableSource, it’s always use a “build-in”
Catalog and Database rather than the custom one. So if I want to use a custom one, I have to write code like this:


StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("ca1")
.withBuiltInDatabaseName("db1")
.build());


As Dawid said, if I want to store in my custom catalog, I can call catalog.createTable or using DDL.


Thanks,
SImon


On 08/13/2019 02:55,Xuefu Z<us...@gmail.com> wrote:
Hi Simon,


Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release.


Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase().


As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database.


Please try those and let me know if you find new problems.


Thanks,
Xuefu







On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:

Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more

It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon




--

Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Xuefu Z <us...@gmail.com>.
Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog
API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
Flink's CatalogManager, It doens't change the default catalog/database as
you expected. To switch to your newly registered catalog, you could call
tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a
"catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:

> Hi All
>     I want to use a custom catalog by setting the name “ca1” and create a
> database under this catalog. When I submit the
> SQL, and it raises the error like :
>
>
>     Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed. From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
> 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>
> It seems that Calcite cannot find the source object as expected, After I
> debug the code I found that when using tableEnv.registerTableSource or
> registerTableSink, It will use a build-in catalog with a hard-code catalog
> name ( default-catalog ) and database name ( default_database ) while
> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> reasonable behaviors ? If I don’t want to use default build-in catalog and
> database, is there any other ways to do this ?
>
>
>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> change build-in catalog !!
> tableEnv.useCatalog(catalog.getName());
> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> "comment"), true);
> tableEnv.useDatabase("db1");
>
> tableEnv.connect(sourceKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSource("orderstream");
>
> tableEnv.connect(sinkKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSink("sinkstream");;
>
> String sql = "insert into ca1.db1.sinkstream " +
> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> ca1.db1.orderstream " +
> "group by tumble(ts, INTERVAL '5' SECOND), data";
>
> tableEnv.sqlUpdate(sql);
> tableEnv.execute("test");
>
>
> Thanks,
> SImon
>
>

-- 
Xuefu Zhang

"In Honey We Trust!"

Re: Flink cannot recognized catalog set by registerCatalog.

Posted by Xuefu Z <us...@gmail.com>.
Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog
API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
Flink's CatalogManager, It doens't change the default catalog/database as
you expected. To switch to your newly registered catalog, you could call
tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a
"catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su <ba...@163.com> wrote:

> Hi All
>     I want to use a custom catalog by setting the name “ca1” and create a
> database under this catalog. When I submit the
> SQL, and it raises the error like :
>
>
>     Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed. From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
> 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>
> It seems that Calcite cannot find the source object as expected, After I
> debug the code I found that when using tableEnv.registerTableSource or
> registerTableSink, It will use a build-in catalog with a hard-code catalog
> name ( default-catalog ) and database name ( default_database ) while
> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> reasonable behaviors ? If I don’t want to use default build-in catalog and
> database, is there any other ways to do this ?
>
>
>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> change build-in catalog !!
> tableEnv.useCatalog(catalog.getName());
> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> "comment"), true);
> tableEnv.useDatabase("db1");
>
> tableEnv.connect(sourceKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSource("orderstream");
>
> tableEnv.connect(sinkKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSink("sinkstream");;
>
> String sql = "insert into ca1.db1.sinkstream " +
> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> ca1.db1.orderstream " +
> "group by tumble(ts, INTERVAL '5' SECOND), data";
>
> tableEnv.sqlUpdate(sql);
> tableEnv.execute("test");
>
>
> Thanks,
> SImon
>
>

-- 
Xuefu Zhang

"In Honey We Trust!"