You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Feng Jin <ji...@gmail.com> on 2023/02/06 10:21:29 UTC

[Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Hi everyone,

The original discussion address is
https://issues.apache.org/jira/browse/FLINK-30126

Currently, Flink has access to many systems, including kafka, hive,
iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
might be:
kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
mysql_database2_xxxx

As the platform of the Flink SQL job, we need to maintain the meta
information of each system of the company, and when the Flink job
starts, we need to register the catalog with the Flink table
environment, so that users can use any table through the
env.executeSql interface.

When we only have a small number of catalogs, we can register like
this, but when there are thousands of catalogs, I think that there
needs to be a dynamic loading mechanism that we can register catalog
when needed, speed up the initialization of the table environment, and
avoid the useless catalog registration process.

Preliminary thoughts:

A new CatalogProvider interface can be added:
It contains two interfaces:
* listCatalogs() interface, which can list all the interfaces that the
interface can provide
* getCatalog() interface,  which can get a catalog instance by catalog name.

```java
public interface CatalogProvider {

    default void initialize(ClassLoader classLoader, ReadableConfig config) {}

    Optional<Catalog> getCatalog(String catalogName);

    Set<String> listCatalogs();
}
```


The corresponding implementation in CatalogManager is as follows:

```java
public CatalogManager {
    private @Nullable CatalogProvider catalogProvider;

    private Map<String, Catalog> catalogs;

    public void setCatalogProvider(CatalogProvider catalogProvider) {
        this.catalogProvider = catalogProvider;
    }

    public Optional<Catalog> getCatalog(String catalogName) {
        // If there is no corresponding catalog in catalogs,
        // get catalog by catalogProvider
        if (catalogProvider != null) {
            Optional<Catalog> catalog = catalogProvider.getCatalog(catalogName);
        }
    }

}
```



Possible problems:

1. Catalog name conflict, how to choose when the registered catalog
and the catalog provided by catalog-provider conflict?
I prefer tableEnv-registered ones over catalogs provided by the
catalog-provider. If the user wishes to reference the catalog provided
by the catalog-provider, they can unregister the catalog in tableEnv
through the `unregisterCatalog` interface.

2. Number of CatalogProviders, is it possible to have multiple
catalogProvider implementations?
I don't have a good idea of this at the moment. If multiple
catalogProviders are supported, it brings much more convenience, But
there may be catalog name conflicts between different
catalogProviders.



Looking forward to your reply, any feedback is appreciated!


Best.

Feng Jin

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Jark Wu <im...@gmail.com>.
I have assigned permission to you. 

Best,
Jark

> 2023年2月10日 17:26,Feng Jin <ji...@gmail.com> 写道:
> 
> I am very happy to do it, please help me to add editing permission, my
> jira id is  hackergin
> 
> Thanks
> 
> Best,
> Feng
> 
> On Fri, Feb 10, 2023 at 4:02 PM Jark Wu <im...@gmail.com> wrote:
>> 
>> Thank you Feng,
>> 
>> Feel free to start a FLIP proposal if you are interested. Looking forward to it!
>> 
>> Best,
>> Jark
>> 
>>> 2023年2月10日 15:44,Feng Jin <ji...@gmail.com> 写道:
>>> 
>>> @Shengkai
>>>> About the catalog jar hot updates
>>> 
>>> Currently we do not have a similar requirement, but if the catalog
>>> management interface is opened, this can indeed realize the hot
>>> loading of the catalog jar
>>> 
>>> 
>>>> do we need to instantiate the Catalog immediately or defer to the usage
>>> 
>>> I think this can be the same as before .
>>> 
>>> 
>>> 
>>> @Jark
>>>> There only can be a single catalog manager in TableEnvironment.
>>> 
>>> big +1 for this.  This can avoid conflicts and also meet the catalog
>>> persistence requirements.
>>> 
>>> 
>>> Best,
>>> Feng
>>> 
>>> On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <im...@gmail.com> wrote:
>>>> 
>>>> Hi Feng,
>>>> 
>>>> It's still easy to conflict and be inconsistent even if we have only one
>>>> CatalogProvider, because CatalogProvider only provides readable interfaces
>>>> (listCatalogs, getCatalog). For example, you may register a catalog X, but
>>>> can't list it because it's not in the external metadata service.
>>>> 
>>>> To avoid catalog conflicts and keep consistent, we can extract the catalog
>>>> management logic as a pluggable interface, including listCatalog,
>>>> getCatalog, registerCatalog, unregisterCatalog, etc. The
>>>> current CatalogManager is a default in-memory implementation, you can
>>>> replace it with user-defined managers, such as
>>>> - file-based: which manages catalog information on local files, just like
>>>> how Presto/Trino manages catalogs
>>>> - metaservice-based: which manages catalog information on external
>>>> metadata service.
>>>> 
>>>> There only can be a single catalog manager in TableEnvironment. This
>>>> guarantees data consistency and avoids conflicts. This approach can address
>>>> another pain point of Flink SQL: the catalog information is not persisted.
>>>> 
>>>> Can this approach satisfy your requirements?
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fs...@gmail.com> wrote:
>>>> 
>>>>> Hi Feng.
>>>>> 
>>>>> I think your idea is very interesting!
>>>>> 
>>>>> 1. I just wonder after initializing the Catalog, will the Session reuse the
>>>>> same Catalog instance or build a new one for later usage? If we reuse the
>>>>> same Catalog, I think it's more like lazy initialization. I am a
>>>>> little prone to rebuild a new one because it's easier for us to catalog jar
>>>>> hot updates.
>>>>> 
>>>>> 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
>>>>> case, do we need to instantiate the Catalog immediately or defer to the
>>>>> usage?
>>>>> 
>>>>> Best,
>>>>> Shengkai
>>>>> 
>>>>> Feng Jin <ji...@gmail.com> 于2023年2月9日周四 20:13写道:
>>>>> 
>>>>>> Thanks for your reply.
>>>>>> 
>>>>>> @Timo
>>>>>> 
>>>>>>> 2) avoid  the default in-memory catalog and offer their catalog before
>>>>>> a  TableEnvironment session starts
>>>>>>> 3) whether this can be disabled and SHOW CATALOGS  can be used for
>>>>>> listing first without having a default catalog.
>>>>>> 
>>>>>> 
>>>>>> Regarding 2 and 3, I think this problem can be solved by introducing
>>>>>> catalog providers, and users can control some default catalog
>>>>>> behavior.
>>>>>> 
>>>>>> 
>>>>>>> We could also use the org.apache.flink.table.factories.Factory infra
>>>>>> and  allow catalog providers via pure string properties
>>>>>> 
>>>>>> I think this is also very useful. In our usage scenarios, it is
>>>>>> usually multi-cluster management, and it is also necessary to pass
>>>>>> different configurations through parameters.
>>>>>> 
>>>>>> 
>>>>>> @Jark @Huang
>>>>>> 
>>>>>>> About the lazy catalog initialization
>>>>>> 
>>>>>> Our needs may be different. If these properties already exist in an
>>>>>> external system, especially when there may be thousands of these
>>>>>> catalog properties, I don’t think it is necessary to register all
>>>>>> these properties in the Flink env at startup, but we need is that we
>>>>>> can register a catalog  when it needs and we can get the properties
>>>>>> from the external meta system .
>>>>>> 
>>>>>> 
>>>>>>> It may be hard to avoid conflicts  and duplicates between
>>>>>> CatalogProvider and CatalogManager
>>>>>> 
>>>>>> It is indeed easy to conflict. My idea is that if we separate the
>>>>>> catalog management of the current CatalogManager as the default
>>>>>> CatalogProvider behavior, at the same time, only one CatalogProvider
>>>>>> exists in a Flink Env.  This may avoid catalog conflicts.
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> Feng
>>>>>> 
>>>>>> On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ru...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Feng,
>>>>>>> I agree with what Jark said. I think what you are looking for is lazy
>>>>>>> initialization.
>>>>>>> 
>>>>>>> I don't think we should introduce the new interface CatalogProvider for
>>>>>>> lazy initialization. What we should do is to store the catalog
>>>>> properties
>>>>>>> and initialize the catalog when we need it. Could you please introduce
>>>>>> some
>>>>>>> other scenarios that we need the CatalogProvider besides the lazy
>>>>>>> initialization?
>>>>>>> 
>>>>>>> If we really need the CatalogProvider, I think it is better to be a
>>>>>> single
>>>>>>> instance. Multiple instances are difficult to manage and there are name
>>>>>>> conflicts among providers.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Hang
>>>>>>> 
>>>>>>> Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:
>>>>>>> 
>>>>>>>> Hi Feng,
>>>>>>>> 
>>>>>>>> I think this feature makes a lot of sense. If I understand correctly,
>>>>>> what
>>>>>>>> you are looking for is lazy catalog initialization.
>>>>>>>> 
>>>>>>>> However, I have some concerns about introducing CatalogProvider,
>>>>> which
>>>>>>>> delegates catalog management to users. It may be hard to avoid
>>>>>> conflicts
>>>>>>>> and duplicates between CatalogProvider and CatalogManager. Is it
>>>>>> possible
>>>>>>>> to have a built-in CatalogProvider to instantiate catalogs lazily?
>>>>>>>> 
>>>>>>>> An idea in my mind is to introduce another catalog registration API
>>>>>>>> without instantiating the catalog, e.g., registerCatalog(String
>>>>>>>> catalogName, Map<String, String> catalogProperties). The catalog
>>>>>>>> information is stored in CatalogManager as pure strings. The catalog
>>>>> is
>>>>>>>> instantiated and initialized when used.
>>>>>>>> 
>>>>>>>> This new API is very similar to other pure-string metadata
>>>>>> registration,
>>>>>>>> such as "createTable(String path, TableDescriptor descriptor)" and
>>>>>>>> "createFunction(String path, String className, List<ResourceUri>
>>>>>>>> resourceUris)".
>>>>>>>> 
>>>>>>>> Can this approach satisfy your requirement?
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>> 
>>>>>>>> On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Feng,
>>>>>>>>> 
>>>>>>>>> this is indeed a good proposal.
>>>>>>>>> 
>>>>>>>>> 1) It makes sense to improve the catalog listing for platform
>>>>>> providers.
>>>>>>>>> 
>>>>>>>>> 2) Other feedback from the past has shown that users would like to
>>>>>> avoid
>>>>>>>>> the default in-memory catalog and offer their catalog before a
>>>>>>>>> TableEnvironment session starts.
>>>>>>>>> 
>>>>>>>>> 3) Also we might reconsider whether a default catalog and default
>>>>>>>>> database make sense. Or whether this can be disabled and SHOW
>>>>>> CATALOGS
>>>>>>>>> can be used for listing first without having a default catalog.
>>>>>>>>> 
>>>>>>>>> What do you think about option 2 and 3?
>>>>>>>>> 
>>>>>>>>> In any case, I would propose we pass a CatalogProvider to
>>>>>>>>> EnvironmentSettings and only allow a single instance. Catalogs
>>>>> should
>>>>>>>>> never shadow other catalogs.
>>>>>>>>> 
>>>>>>>>> We could also use the org.apache.flink.table.factories.Factory
>>>>> infra
>>>>>> and
>>>>>>>>> allow catalog providers via pure string properties. Not sure if we
>>>>>> need
>>>>>>>>> this in the first version though.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Timo
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 06.02.23 11:21, Feng Jin wrote:
>>>>>>>>>> Hi everyone,
>>>>>>>>>> 
>>>>>>>>>> The original discussion address is
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-30126
>>>>>>>>>> 
>>>>>>>>>> Currently, Flink has access to many systems, including kafka,
>>>>> hive,
>>>>>>>>>> iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
>>>>>> name
>>>>>>>>>> might be:
>>>>>>>>>> kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
>>>>>>>>>> iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
>>>>>>>>>> mysql_database2_xxxx
>>>>>>>>>> 
>>>>>>>>>> As the platform of the Flink SQL job, we need to maintain the
>>>>> meta
>>>>>>>>>> information of each system of the company, and when the Flink job
>>>>>>>>>> starts, we need to register the catalog with the Flink table
>>>>>>>>>> environment, so that users can use any table through the
>>>>>>>>>> env.executeSql interface.
>>>>>>>>>> 
>>>>>>>>>> When we only have a small number of catalogs, we can register
>>>>> like
>>>>>>>>>> this, but when there are thousands of catalogs, I think that
>>>>> there
>>>>>>>>>> needs to be a dynamic loading mechanism that we can register
>>>>>> catalog
>>>>>>>>>> when needed, speed up the initialization of the table
>>>>> environment,
>>>>>> and
>>>>>>>>>> avoid the useless catalog registration process.
>>>>>>>>>> 
>>>>>>>>>> Preliminary thoughts:
>>>>>>>>>> 
>>>>>>>>>> A new CatalogProvider interface can be added:
>>>>>>>>>> It contains two interfaces:
>>>>>>>>>> * listCatalogs() interface, which can list all the interfaces
>>>>> that
>>>>>> the
>>>>>>>>>> interface can provide
>>>>>>>>>> * getCatalog() interface,  which can get a catalog instance by
>>>>>> catalog
>>>>>>>>> name.
>>>>>>>>>> 
>>>>>>>>>> ```java
>>>>>>>>>> public interface CatalogProvider {
>>>>>>>>>> 
>>>>>>>>>>    default void initialize(ClassLoader classLoader,
>>>>>> ReadableConfig
>>>>>>>>> config) {}
>>>>>>>>>> 
>>>>>>>>>>    Optional<Catalog> getCatalog(String catalogName);
>>>>>>>>>> 
>>>>>>>>>>    Set<String> listCatalogs();
>>>>>>>>>> }
>>>>>>>>>> ```
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> The corresponding implementation in CatalogManager is as follows:
>>>>>>>>>> 
>>>>>>>>>> ```java
>>>>>>>>>> public CatalogManager {
>>>>>>>>>>    private @Nullable CatalogProvider catalogProvider;
>>>>>>>>>> 
>>>>>>>>>>    private Map<String, Catalog> catalogs;
>>>>>>>>>> 
>>>>>>>>>>    public void setCatalogProvider(CatalogProvider
>>>>>> catalogProvider) {
>>>>>>>>>>        this.catalogProvider = catalogProvider;
>>>>>>>>>>    }
>>>>>>>>>> 
>>>>>>>>>>    public Optional<Catalog> getCatalog(String catalogName) {
>>>>>>>>>>        // If there is no corresponding catalog in catalogs,
>>>>>>>>>>        // get catalog by catalogProvider
>>>>>>>>>>        if (catalogProvider != null) {
>>>>>>>>>>            Optional<Catalog> catalog =
>>>>>>>>> catalogProvider.getCatalog(catalogName);
>>>>>>>>>>        }
>>>>>>>>>>    }
>>>>>>>>>> 
>>>>>>>>>> }
>>>>>>>>>> ```
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Possible problems:
>>>>>>>>>> 
>>>>>>>>>> 1. Catalog name conflict, how to choose when the registered
>>>>> catalog
>>>>>>>>>> and the catalog provided by catalog-provider conflict?
>>>>>>>>>> I prefer tableEnv-registered ones over catalogs provided by the
>>>>>>>>>> catalog-provider. If the user wishes to reference the catalog
>>>>>> provided
>>>>>>>>>> by the catalog-provider, they can unregister the catalog in
>>>>>> tableEnv
>>>>>>>>>> through the `unregisterCatalog` interface.
>>>>>>>>>> 
>>>>>>>>>> 2. Number of CatalogProviders, is it possible to have multiple
>>>>>>>>>> catalogProvider implementations?
>>>>>>>>>> I don't have a good idea of this at the moment. If multiple
>>>>>>>>>> catalogProviders are supported, it brings much more convenience,
>>>>>> But
>>>>>>>>>> there may be catalog name conflicts between different
>>>>>>>>>> catalogProviders.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Looking forward to your reply, any feedback is appreciated!
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Best.
>>>>>>>>>> 
>>>>>>>>>> Feng Jin
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>> 


Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Feng Jin <ji...@gmail.com>.
I am very happy to do it, please help me to add editing permission, my
jira id is  hackergin

Thanks

Best,
Feng

On Fri, Feb 10, 2023 at 4:02 PM Jark Wu <im...@gmail.com> wrote:
>
> Thank you Feng,
>
> Feel free to start a FLIP proposal if you are interested. Looking forward to it!
>
> Best,
> Jark
>
> > 2023年2月10日 15:44,Feng Jin <ji...@gmail.com> 写道:
> >
> > @Shengkai
> >> About the catalog jar hot updates
> >
> > Currently we do not have a similar requirement, but if the catalog
> > management interface is opened, this can indeed realize the hot
> > loading of the catalog jar
> >
> >
> >> do we need to instantiate the Catalog immediately or defer to the usage
> >
> > I think this can be the same as before .
> >
> >
> >
> > @Jark
> >> There only can be a single catalog manager in TableEnvironment.
> >
> > big +1 for this.  This can avoid conflicts and also meet the catalog
> > persistence requirements.
> >
> >
> > Best,
> > Feng
> >
> > On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <im...@gmail.com> wrote:
> >>
> >> Hi Feng,
> >>
> >> It's still easy to conflict and be inconsistent even if we have only one
> >> CatalogProvider, because CatalogProvider only provides readable interfaces
> >> (listCatalogs, getCatalog). For example, you may register a catalog X, but
> >> can't list it because it's not in the external metadata service.
> >>
> >> To avoid catalog conflicts and keep consistent, we can extract the catalog
> >> management logic as a pluggable interface, including listCatalog,
> >> getCatalog, registerCatalog, unregisterCatalog, etc. The
> >> current CatalogManager is a default in-memory implementation, you can
> >> replace it with user-defined managers, such as
> >> - file-based: which manages catalog information on local files, just like
> >> how Presto/Trino manages catalogs
> >> - metaservice-based: which manages catalog information on external
> >> metadata service.
> >>
> >> There only can be a single catalog manager in TableEnvironment. This
> >> guarantees data consistency and avoids conflicts. This approach can address
> >> another pain point of Flink SQL: the catalog information is not persisted.
> >>
> >> Can this approach satisfy your requirements?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >>
> >>
> >> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fs...@gmail.com> wrote:
> >>
> >>> Hi Feng.
> >>>
> >>> I think your idea is very interesting!
> >>>
> >>> 1. I just wonder after initializing the Catalog, will the Session reuse the
> >>> same Catalog instance or build a new one for later usage? If we reuse the
> >>> same Catalog, I think it's more like lazy initialization. I am a
> >>> little prone to rebuild a new one because it's easier for us to catalog jar
> >>> hot updates.
> >>>
> >>> 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
> >>> case, do we need to instantiate the Catalog immediately or defer to the
> >>> usage?
> >>>
> >>> Best,
> >>> Shengkai
> >>>
> >>> Feng Jin <ji...@gmail.com> 于2023年2月9日周四 20:13写道:
> >>>
> >>>> Thanks for your reply.
> >>>>
> >>>> @Timo
> >>>>
> >>>>> 2) avoid  the default in-memory catalog and offer their catalog before
> >>>> a  TableEnvironment session starts
> >>>>> 3) whether this can be disabled and SHOW CATALOGS  can be used for
> >>>> listing first without having a default catalog.
> >>>>
> >>>>
> >>>> Regarding 2 and 3, I think this problem can be solved by introducing
> >>>> catalog providers, and users can control some default catalog
> >>>> behavior.
> >>>>
> >>>>
> >>>>> We could also use the org.apache.flink.table.factories.Factory infra
> >>>> and  allow catalog providers via pure string properties
> >>>>
> >>>> I think this is also very useful. In our usage scenarios, it is
> >>>> usually multi-cluster management, and it is also necessary to pass
> >>>> different configurations through parameters.
> >>>>
> >>>>
> >>>> @Jark @Huang
> >>>>
> >>>>> About the lazy catalog initialization
> >>>>
> >>>> Our needs may be different. If these properties already exist in an
> >>>> external system, especially when there may be thousands of these
> >>>> catalog properties, I don’t think it is necessary to register all
> >>>> these properties in the Flink env at startup, but we need is that we
> >>>> can register a catalog  when it needs and we can get the properties
> >>>> from the external meta system .
> >>>>
> >>>>
> >>>>> It may be hard to avoid conflicts  and duplicates between
> >>>> CatalogProvider and CatalogManager
> >>>>
> >>>> It is indeed easy to conflict. My idea is that if we separate the
> >>>> catalog management of the current CatalogManager as the default
> >>>> CatalogProvider behavior, at the same time, only one CatalogProvider
> >>>> exists in a Flink Env.  This may avoid catalog conflicts.
> >>>>
> >>>>
> >>>> Best,
> >>>> Feng
> >>>>
> >>>> On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ru...@gmail.com> wrote:
> >>>>>
> >>>>> Hi Feng,
> >>>>> I agree with what Jark said. I think what you are looking for is lazy
> >>>>> initialization.
> >>>>>
> >>>>> I don't think we should introduce the new interface CatalogProvider for
> >>>>> lazy initialization. What we should do is to store the catalog
> >>> properties
> >>>>> and initialize the catalog when we need it. Could you please introduce
> >>>> some
> >>>>> other scenarios that we need the CatalogProvider besides the lazy
> >>>>> initialization?
> >>>>>
> >>>>> If we really need the CatalogProvider, I think it is better to be a
> >>>> single
> >>>>> instance. Multiple instances are difficult to manage and there are name
> >>>>> conflicts among providers.
> >>>>>
> >>>>> Best,
> >>>>> Hang
> >>>>>
> >>>>> Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:
> >>>>>
> >>>>>> Hi Feng,
> >>>>>>
> >>>>>> I think this feature makes a lot of sense. If I understand correctly,
> >>>> what
> >>>>>> you are looking for is lazy catalog initialization.
> >>>>>>
> >>>>>> However, I have some concerns about introducing CatalogProvider,
> >>> which
> >>>>>> delegates catalog management to users. It may be hard to avoid
> >>>> conflicts
> >>>>>> and duplicates between CatalogProvider and CatalogManager. Is it
> >>>> possible
> >>>>>> to have a built-in CatalogProvider to instantiate catalogs lazily?
> >>>>>>
> >>>>>> An idea in my mind is to introduce another catalog registration API
> >>>>>> without instantiating the catalog, e.g., registerCatalog(String
> >>>>>> catalogName, Map<String, String> catalogProperties). The catalog
> >>>>>> information is stored in CatalogManager as pure strings. The catalog
> >>> is
> >>>>>> instantiated and initialized when used.
> >>>>>>
> >>>>>> This new API is very similar to other pure-string metadata
> >>>> registration,
> >>>>>> such as "createTable(String path, TableDescriptor descriptor)" and
> >>>>>> "createFunction(String path, String className, List<ResourceUri>
> >>>>>> resourceUris)".
> >>>>>>
> >>>>>> Can this approach satisfy your requirement?
> >>>>>>
> >>>>>> Best,
> >>>>>> Jark
> >>>>>>
> >>>>>> On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi Feng,
> >>>>>>>
> >>>>>>> this is indeed a good proposal.
> >>>>>>>
> >>>>>>> 1) It makes sense to improve the catalog listing for platform
> >>>> providers.
> >>>>>>>
> >>>>>>> 2) Other feedback from the past has shown that users would like to
> >>>> avoid
> >>>>>>> the default in-memory catalog and offer their catalog before a
> >>>>>>> TableEnvironment session starts.
> >>>>>>>
> >>>>>>> 3) Also we might reconsider whether a default catalog and default
> >>>>>>> database make sense. Or whether this can be disabled and SHOW
> >>>> CATALOGS
> >>>>>>> can be used for listing first without having a default catalog.
> >>>>>>>
> >>>>>>> What do you think about option 2 and 3?
> >>>>>>>
> >>>>>>> In any case, I would propose we pass a CatalogProvider to
> >>>>>>> EnvironmentSettings and only allow a single instance. Catalogs
> >>> should
> >>>>>>> never shadow other catalogs.
> >>>>>>>
> >>>>>>> We could also use the org.apache.flink.table.factories.Factory
> >>> infra
> >>>> and
> >>>>>>> allow catalog providers via pure string properties. Not sure if we
> >>>> need
> >>>>>>> this in the first version though.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>>
> >>>>>>> On 06.02.23 11:21, Feng Jin wrote:
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> The original discussion address is
> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-30126
> >>>>>>>>
> >>>>>>>> Currently, Flink has access to many systems, including kafka,
> >>> hive,
> >>>>>>>> iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
> >>>> name
> >>>>>>>> might be:
> >>>>>>>> kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> >>>>>>>> iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> >>>>>>>> mysql_database2_xxxx
> >>>>>>>>
> >>>>>>>> As the platform of the Flink SQL job, we need to maintain the
> >>> meta
> >>>>>>>> information of each system of the company, and when the Flink job
> >>>>>>>> starts, we need to register the catalog with the Flink table
> >>>>>>>> environment, so that users can use any table through the
> >>>>>>>> env.executeSql interface.
> >>>>>>>>
> >>>>>>>> When we only have a small number of catalogs, we can register
> >>> like
> >>>>>>>> this, but when there are thousands of catalogs, I think that
> >>> there
> >>>>>>>> needs to be a dynamic loading mechanism that we can register
> >>>> catalog
> >>>>>>>> when needed, speed up the initialization of the table
> >>> environment,
> >>>> and
> >>>>>>>> avoid the useless catalog registration process.
> >>>>>>>>
> >>>>>>>> Preliminary thoughts:
> >>>>>>>>
> >>>>>>>> A new CatalogProvider interface can be added:
> >>>>>>>> It contains two interfaces:
> >>>>>>>> * listCatalogs() interface, which can list all the interfaces
> >>> that
> >>>> the
> >>>>>>>> interface can provide
> >>>>>>>> * getCatalog() interface,  which can get a catalog instance by
> >>>> catalog
> >>>>>>> name.
> >>>>>>>>
> >>>>>>>> ```java
> >>>>>>>> public interface CatalogProvider {
> >>>>>>>>
> >>>>>>>>     default void initialize(ClassLoader classLoader,
> >>>> ReadableConfig
> >>>>>>> config) {}
> >>>>>>>>
> >>>>>>>>     Optional<Catalog> getCatalog(String catalogName);
> >>>>>>>>
> >>>>>>>>     Set<String> listCatalogs();
> >>>>>>>> }
> >>>>>>>> ```
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> The corresponding implementation in CatalogManager is as follows:
> >>>>>>>>
> >>>>>>>> ```java
> >>>>>>>> public CatalogManager {
> >>>>>>>>     private @Nullable CatalogProvider catalogProvider;
> >>>>>>>>
> >>>>>>>>     private Map<String, Catalog> catalogs;
> >>>>>>>>
> >>>>>>>>     public void setCatalogProvider(CatalogProvider
> >>>> catalogProvider) {
> >>>>>>>>         this.catalogProvider = catalogProvider;
> >>>>>>>>     }
> >>>>>>>>
> >>>>>>>>     public Optional<Catalog> getCatalog(String catalogName) {
> >>>>>>>>         // If there is no corresponding catalog in catalogs,
> >>>>>>>>         // get catalog by catalogProvider
> >>>>>>>>         if (catalogProvider != null) {
> >>>>>>>>             Optional<Catalog> catalog =
> >>>>>>> catalogProvider.getCatalog(catalogName);
> >>>>>>>>         }
> >>>>>>>>     }
> >>>>>>>>
> >>>>>>>> }
> >>>>>>>> ```
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Possible problems:
> >>>>>>>>
> >>>>>>>> 1. Catalog name conflict, how to choose when the registered
> >>> catalog
> >>>>>>>> and the catalog provided by catalog-provider conflict?
> >>>>>>>> I prefer tableEnv-registered ones over catalogs provided by the
> >>>>>>>> catalog-provider. If the user wishes to reference the catalog
> >>>> provided
> >>>>>>>> by the catalog-provider, they can unregister the catalog in
> >>>> tableEnv
> >>>>>>>> through the `unregisterCatalog` interface.
> >>>>>>>>
> >>>>>>>> 2. Number of CatalogProviders, is it possible to have multiple
> >>>>>>>> catalogProvider implementations?
> >>>>>>>> I don't have a good idea of this at the moment. If multiple
> >>>>>>>> catalogProviders are supported, it brings much more convenience,
> >>>> But
> >>>>>>>> there may be catalog name conflicts between different
> >>>>>>>> catalogProviders.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Looking forward to your reply, any feedback is appreciated!
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best.
> >>>>>>>>
> >>>>>>>> Feng Jin
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
>

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Jark Wu <im...@gmail.com>.
Thank you Feng, 

Feel free to start a FLIP proposal if you are interested. Looking forward to it!

Best,
Jark

> 2023年2月10日 15:44,Feng Jin <ji...@gmail.com> 写道:
> 
> @Shengkai
>> About the catalog jar hot updates
> 
> Currently we do not have a similar requirement, but if the catalog
> management interface is opened, this can indeed realize the hot
> loading of the catalog jar
> 
> 
>> do we need to instantiate the Catalog immediately or defer to the usage
> 
> I think this can be the same as before .
> 
> 
> 
> @Jark
>> There only can be a single catalog manager in TableEnvironment.
> 
> big +1 for this.  This can avoid conflicts and also meet the catalog
> persistence requirements.
> 
> 
> Best,
> Feng
> 
> On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <im...@gmail.com> wrote:
>> 
>> Hi Feng,
>> 
>> It's still easy to conflict and be inconsistent even if we have only one
>> CatalogProvider, because CatalogProvider only provides readable interfaces
>> (listCatalogs, getCatalog). For example, you may register a catalog X, but
>> can't list it because it's not in the external metadata service.
>> 
>> To avoid catalog conflicts and keep consistent, we can extract the catalog
>> management logic as a pluggable interface, including listCatalog,
>> getCatalog, registerCatalog, unregisterCatalog, etc. The
>> current CatalogManager is a default in-memory implementation, you can
>> replace it with user-defined managers, such as
>> - file-based: which manages catalog information on local files, just like
>> how Presto/Trino manages catalogs
>> - metaservice-based: which manages catalog information on external
>> metadata service.
>> 
>> There only can be a single catalog manager in TableEnvironment. This
>> guarantees data consistency and avoids conflicts. This approach can address
>> another pain point of Flink SQL: the catalog information is not persisted.
>> 
>> Can this approach satisfy your requirements?
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>> 
>> 
>> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fs...@gmail.com> wrote:
>> 
>>> Hi Feng.
>>> 
>>> I think your idea is very interesting!
>>> 
>>> 1. I just wonder after initializing the Catalog, will the Session reuse the
>>> same Catalog instance or build a new one for later usage? If we reuse the
>>> same Catalog, I think it's more like lazy initialization. I am a
>>> little prone to rebuild a new one because it's easier for us to catalog jar
>>> hot updates.
>>> 
>>> 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
>>> case, do we need to instantiate the Catalog immediately or defer to the
>>> usage?
>>> 
>>> Best,
>>> Shengkai
>>> 
>>> Feng Jin <ji...@gmail.com> 于2023年2月9日周四 20:13写道:
>>> 
>>>> Thanks for your reply.
>>>> 
>>>> @Timo
>>>> 
>>>>> 2) avoid  the default in-memory catalog and offer their catalog before
>>>> a  TableEnvironment session starts
>>>>> 3) whether this can be disabled and SHOW CATALOGS  can be used for
>>>> listing first without having a default catalog.
>>>> 
>>>> 
>>>> Regarding 2 and 3, I think this problem can be solved by introducing
>>>> catalog providers, and users can control some default catalog
>>>> behavior.
>>>> 
>>>> 
>>>>> We could also use the org.apache.flink.table.factories.Factory infra
>>>> and  allow catalog providers via pure string properties
>>>> 
>>>> I think this is also very useful. In our usage scenarios, it is
>>>> usually multi-cluster management, and it is also necessary to pass
>>>> different configurations through parameters.
>>>> 
>>>> 
>>>> @Jark @Huang
>>>> 
>>>>> About the lazy catalog initialization
>>>> 
>>>> Our needs may be different. If these properties already exist in an
>>>> external system, especially when there may be thousands of these
>>>> catalog properties, I don’t think it is necessary to register all
>>>> these properties in the Flink env at startup, but we need is that we
>>>> can register a catalog  when it needs and we can get the properties
>>>> from the external meta system .
>>>> 
>>>> 
>>>>> It may be hard to avoid conflicts  and duplicates between
>>>> CatalogProvider and CatalogManager
>>>> 
>>>> It is indeed easy to conflict. My idea is that if we separate the
>>>> catalog management of the current CatalogManager as the default
>>>> CatalogProvider behavior, at the same time, only one CatalogProvider
>>>> exists in a Flink Env.  This may avoid catalog conflicts.
>>>> 
>>>> 
>>>> Best,
>>>> Feng
>>>> 
>>>> On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ru...@gmail.com> wrote:
>>>>> 
>>>>> Hi Feng,
>>>>> I agree with what Jark said. I think what you are looking for is lazy
>>>>> initialization.
>>>>> 
>>>>> I don't think we should introduce the new interface CatalogProvider for
>>>>> lazy initialization. What we should do is to store the catalog
>>> properties
>>>>> and initialize the catalog when we need it. Could you please introduce
>>>> some
>>>>> other scenarios that we need the CatalogProvider besides the lazy
>>>>> initialization?
>>>>> 
>>>>> If we really need the CatalogProvider, I think it is better to be a
>>>> single
>>>>> instance. Multiple instances are difficult to manage and there are name
>>>>> conflicts among providers.
>>>>> 
>>>>> Best,
>>>>> Hang
>>>>> 
>>>>> Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:
>>>>> 
>>>>>> Hi Feng,
>>>>>> 
>>>>>> I think this feature makes a lot of sense. If I understand correctly,
>>>> what
>>>>>> you are looking for is lazy catalog initialization.
>>>>>> 
>>>>>> However, I have some concerns about introducing CatalogProvider,
>>> which
>>>>>> delegates catalog management to users. It may be hard to avoid
>>>> conflicts
>>>>>> and duplicates between CatalogProvider and CatalogManager. Is it
>>>> possible
>>>>>> to have a built-in CatalogProvider to instantiate catalogs lazily?
>>>>>> 
>>>>>> An idea in my mind is to introduce another catalog registration API
>>>>>> without instantiating the catalog, e.g., registerCatalog(String
>>>>>> catalogName, Map<String, String> catalogProperties). The catalog
>>>>>> information is stored in CatalogManager as pure strings. The catalog
>>> is
>>>>>> instantiated and initialized when used.
>>>>>> 
>>>>>> This new API is very similar to other pure-string metadata
>>>> registration,
>>>>>> such as "createTable(String path, TableDescriptor descriptor)" and
>>>>>> "createFunction(String path, String className, List<ResourceUri>
>>>>>> resourceUris)".
>>>>>> 
>>>>>> Can this approach satisfy your requirement?
>>>>>> 
>>>>>> Best,
>>>>>> Jark
>>>>>> 
>>>>>> On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org>
>>> wrote:
>>>>>> 
>>>>>>> Hi Feng,
>>>>>>> 
>>>>>>> this is indeed a good proposal.
>>>>>>> 
>>>>>>> 1) It makes sense to improve the catalog listing for platform
>>>> providers.
>>>>>>> 
>>>>>>> 2) Other feedback from the past has shown that users would like to
>>>> avoid
>>>>>>> the default in-memory catalog and offer their catalog before a
>>>>>>> TableEnvironment session starts.
>>>>>>> 
>>>>>>> 3) Also we might reconsider whether a default catalog and default
>>>>>>> database make sense. Or whether this can be disabled and SHOW
>>>> CATALOGS
>>>>>>> can be used for listing first without having a default catalog.
>>>>>>> 
>>>>>>> What do you think about option 2 and 3?
>>>>>>> 
>>>>>>> In any case, I would propose we pass a CatalogProvider to
>>>>>>> EnvironmentSettings and only allow a single instance. Catalogs
>>> should
>>>>>>> never shadow other catalogs.
>>>>>>> 
>>>>>>> We could also use the org.apache.flink.table.factories.Factory
>>> infra
>>>> and
>>>>>>> allow catalog providers via pure string properties. Not sure if we
>>>> need
>>>>>>> this in the first version though.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Timo
>>>>>>> 
>>>>>>> 
>>>>>>> On 06.02.23 11:21, Feng Jin wrote:
>>>>>>>> Hi everyone,
>>>>>>>> 
>>>>>>>> The original discussion address is
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-30126
>>>>>>>> 
>>>>>>>> Currently, Flink has access to many systems, including kafka,
>>> hive,
>>>>>>>> iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
>>>> name
>>>>>>>> might be:
>>>>>>>> kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
>>>>>>>> iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
>>>>>>>> mysql_database2_xxxx
>>>>>>>> 
>>>>>>>> As the platform of the Flink SQL job, we need to maintain the
>>> meta
>>>>>>>> information of each system of the company, and when the Flink job
>>>>>>>> starts, we need to register the catalog with the Flink table
>>>>>>>> environment, so that users can use any table through the
>>>>>>>> env.executeSql interface.
>>>>>>>> 
>>>>>>>> When we only have a small number of catalogs, we can register
>>> like
>>>>>>>> this, but when there are thousands of catalogs, I think that
>>> there
>>>>>>>> needs to be a dynamic loading mechanism that we can register
>>>> catalog
>>>>>>>> when needed, speed up the initialization of the table
>>> environment,
>>>> and
>>>>>>>> avoid the useless catalog registration process.
>>>>>>>> 
>>>>>>>> Preliminary thoughts:
>>>>>>>> 
>>>>>>>> A new CatalogProvider interface can be added:
>>>>>>>> It contains two interfaces:
>>>>>>>> * listCatalogs() interface, which can list all the interfaces
>>> that
>>>> the
>>>>>>>> interface can provide
>>>>>>>> * getCatalog() interface,  which can get a catalog instance by
>>>> catalog
>>>>>>> name.
>>>>>>>> 
>>>>>>>> ```java
>>>>>>>> public interface CatalogProvider {
>>>>>>>> 
>>>>>>>>     default void initialize(ClassLoader classLoader,
>>>> ReadableConfig
>>>>>>> config) {}
>>>>>>>> 
>>>>>>>>     Optional<Catalog> getCatalog(String catalogName);
>>>>>>>> 
>>>>>>>>     Set<String> listCatalogs();
>>>>>>>> }
>>>>>>>> ```
>>>>>>>> 
>>>>>>>> 
>>>>>>>> The corresponding implementation in CatalogManager is as follows:
>>>>>>>> 
>>>>>>>> ```java
>>>>>>>> public CatalogManager {
>>>>>>>>     private @Nullable CatalogProvider catalogProvider;
>>>>>>>> 
>>>>>>>>     private Map<String, Catalog> catalogs;
>>>>>>>> 
>>>>>>>>     public void setCatalogProvider(CatalogProvider
>>>> catalogProvider) {
>>>>>>>>         this.catalogProvider = catalogProvider;
>>>>>>>>     }
>>>>>>>> 
>>>>>>>>     public Optional<Catalog> getCatalog(String catalogName) {
>>>>>>>>         // If there is no corresponding catalog in catalogs,
>>>>>>>>         // get catalog by catalogProvider
>>>>>>>>         if (catalogProvider != null) {
>>>>>>>>             Optional<Catalog> catalog =
>>>>>>> catalogProvider.getCatalog(catalogName);
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>> 
>>>>>>>> }
>>>>>>>> ```
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Possible problems:
>>>>>>>> 
>>>>>>>> 1. Catalog name conflict, how to choose when the registered
>>> catalog
>>>>>>>> and the catalog provided by catalog-provider conflict?
>>>>>>>> I prefer tableEnv-registered ones over catalogs provided by the
>>>>>>>> catalog-provider. If the user wishes to reference the catalog
>>>> provided
>>>>>>>> by the catalog-provider, they can unregister the catalog in
>>>> tableEnv
>>>>>>>> through the `unregisterCatalog` interface.
>>>>>>>> 
>>>>>>>> 2. Number of CatalogProviders, is it possible to have multiple
>>>>>>>> catalogProvider implementations?
>>>>>>>> I don't have a good idea of this at the moment. If multiple
>>>>>>>> catalogProviders are supported, it brings much more convenience,
>>>> But
>>>>>>>> there may be catalog name conflicts between different
>>>>>>>> catalogProviders.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Looking forward to your reply, any feedback is appreciated!
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Best.
>>>>>>>> 
>>>>>>>> Feng Jin
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>> 


Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Feng Jin <ji...@gmail.com>.
@Shengkai
> About the catalog jar hot updates

Currently we do not have a similar requirement, but if the catalog
management interface is opened, this can indeed realize the hot
loading of the catalog jar


>  do we need to instantiate the Catalog immediately or defer to the usage

I think this can be the same as before .



@Jark
> There only can be a single catalog manager in TableEnvironment.

big +1 for this.  This can avoid conflicts and also meet the catalog
persistence requirements.


Best,
Feng

On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <im...@gmail.com> wrote:
>
> Hi Feng,
>
> It's still easy to conflict and be inconsistent even if we have only one
> CatalogProvider, because CatalogProvider only provides readable interfaces
> (listCatalogs, getCatalog). For example, you may register a catalog X, but
> can't list it because it's not in the external metadata service.
>
> To avoid catalog conflicts and keep consistent, we can extract the catalog
> management logic as a pluggable interface, including listCatalog,
> getCatalog, registerCatalog, unregisterCatalog, etc. The
> current CatalogManager is a default in-memory implementation, you can
> replace it with user-defined managers, such as
>  - file-based: which manages catalog information on local files, just like
> how Presto/Trino manages catalogs
>  - metaservice-based: which manages catalog information on external
> metadata service.
>
> There only can be a single catalog manager in TableEnvironment. This
> guarantees data consistency and avoids conflicts. This approach can address
> another pain point of Flink SQL: the catalog information is not persisted.
>
> Can this approach satisfy your requirements?
>
> Best,
> Jark
>
>
>
>
>
> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fs...@gmail.com> wrote:
>
> > Hi Feng.
> >
> > I think your idea is very interesting!
> >
> > 1. I just wonder after initializing the Catalog, will the Session reuse the
> > same Catalog instance or build a new one for later usage? If we reuse the
> > same Catalog, I think it's more like lazy initialization. I am a
> > little prone to rebuild a new one because it's easier for us to catalog jar
> > hot updates.
> >
> > 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
> > case, do we need to instantiate the Catalog immediately or defer to the
> > usage?
> >
> > Best,
> > Shengkai
> >
> > Feng Jin <ji...@gmail.com> 于2023年2月9日周四 20:13写道:
> >
> > > Thanks for your reply.
> > >
> > > @Timo
> > >
> > > >  2) avoid  the default in-memory catalog and offer their catalog before
> > > a  TableEnvironment session starts
> > > >  3) whether this can be disabled and SHOW CATALOGS  can be used for
> > > listing first without having a default catalog.
> > >
> > >
> > > Regarding 2 and 3, I think this problem can be solved by introducing
> > > catalog providers, and users can control some default catalog
> > > behavior.
> > >
> > >
> > > > We could also use the org.apache.flink.table.factories.Factory infra
> > > and  allow catalog providers via pure string properties
> > >
> > > I think this is also very useful. In our usage scenarios, it is
> > > usually multi-cluster management, and it is also necessary to pass
> > > different configurations through parameters.
> > >
> > >
> > > @Jark @Huang
> > >
> > > >  About the lazy catalog initialization
> > >
> > > Our needs may be different. If these properties already exist in an
> > > external system, especially when there may be thousands of these
> > > catalog properties, I don’t think it is necessary to register all
> > > these properties in the Flink env at startup, but we need is that we
> > > can register a catalog  when it needs and we can get the properties
> > > from the external meta system .
> > >
> > >
> > > >  It may be hard to avoid conflicts  and duplicates between
> > > CatalogProvider and CatalogManager
> > >
> > > It is indeed easy to conflict. My idea is that if we separate the
> > > catalog management of the current CatalogManager as the default
> > > CatalogProvider behavior, at the same time, only one CatalogProvider
> > > exists in a Flink Env.  This may avoid catalog conflicts.
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ru...@gmail.com> wrote:
> > > >
> > > > Hi Feng,
> > > > I agree with what Jark said. I think what you are looking for is lazy
> > > > initialization.
> > > >
> > > > I don't think we should introduce the new interface CatalogProvider for
> > > > lazy initialization. What we should do is to store the catalog
> > properties
> > > > and initialize the catalog when we need it. Could you please introduce
> > > some
> > > > other scenarios that we need the CatalogProvider besides the lazy
> > > > initialization?
> > > >
> > > > If we really need the CatalogProvider, I think it is better to be a
> > > single
> > > > instance. Multiple instances are difficult to manage and there are name
> > > > conflicts among providers.
> > > >
> > > > Best,
> > > > Hang
> > > >
> > > > Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:
> > > >
> > > > > Hi Feng,
> > > > >
> > > > > I think this feature makes a lot of sense. If I understand correctly,
> > > what
> > > > > you are looking for is lazy catalog initialization.
> > > > >
> > > > > However, I have some concerns about introducing CatalogProvider,
> > which
> > > > > delegates catalog management to users. It may be hard to avoid
> > > conflicts
> > > > > and duplicates between CatalogProvider and CatalogManager. Is it
> > > possible
> > > > > to have a built-in CatalogProvider to instantiate catalogs lazily?
> > > > >
> > > > > An idea in my mind is to introduce another catalog registration API
> > > > > without instantiating the catalog, e.g., registerCatalog(String
> > > > > catalogName, Map<String, String> catalogProperties). The catalog
> > > > > information is stored in CatalogManager as pure strings. The catalog
> > is
> > > > > instantiated and initialized when used.
> > > > >
> > > > > This new API is very similar to other pure-string metadata
> > > registration,
> > > > > such as "createTable(String path, TableDescriptor descriptor)" and
> > > > > "createFunction(String path, String className, List<ResourceUri>
> > > > > resourceUris)".
> > > > >
> > > > > Can this approach satisfy your requirement?
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi Feng,
> > > > > >
> > > > > > this is indeed a good proposal.
> > > > > >
> > > > > > 1) It makes sense to improve the catalog listing for platform
> > > providers.
> > > > > >
> > > > > > 2) Other feedback from the past has shown that users would like to
> > > avoid
> > > > > > the default in-memory catalog and offer their catalog before a
> > > > > > TableEnvironment session starts.
> > > > > >
> > > > > > 3) Also we might reconsider whether a default catalog and default
> > > > > > database make sense. Or whether this can be disabled and SHOW
> > > CATALOGS
> > > > > > can be used for listing first without having a default catalog.
> > > > > >
> > > > > > What do you think about option 2 and 3?
> > > > > >
> > > > > > In any case, I would propose we pass a CatalogProvider to
> > > > > > EnvironmentSettings and only allow a single instance. Catalogs
> > should
> > > > > > never shadow other catalogs.
> > > > > >
> > > > > > We could also use the org.apache.flink.table.factories.Factory
> > infra
> > > and
> > > > > > allow catalog providers via pure string properties. Not sure if we
> > > need
> > > > > > this in the first version though.
> > > > > >
> > > > > > Cheers,
> > > > > > Timo
> > > > > >
> > > > > >
> > > > > > On 06.02.23 11:21, Feng Jin wrote:
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > The original discussion address is
> > > > > > > https://issues.apache.org/jira/browse/FLINK-30126
> > > > > > >
> > > > > > > Currently, Flink has access to many systems, including kafka,
> > hive,
> > > > > > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
> > > name
> > > > > > > might be:
> > > > > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > > > > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > > > > > mysql_database2_xxxx
> > > > > > >
> > > > > > > As the platform of the Flink SQL job, we need to maintain the
> > meta
> > > > > > > information of each system of the company, and when the Flink job
> > > > > > > starts, we need to register the catalog with the Flink table
> > > > > > > environment, so that users can use any table through the
> > > > > > > env.executeSql interface.
> > > > > > >
> > > > > > > When we only have a small number of catalogs, we can register
> > like
> > > > > > > this, but when there are thousands of catalogs, I think that
> > there
> > > > > > > needs to be a dynamic loading mechanism that we can register
> > > catalog
> > > > > > > when needed, speed up the initialization of the table
> > environment,
> > > and
> > > > > > > avoid the useless catalog registration process.
> > > > > > >
> > > > > > > Preliminary thoughts:
> > > > > > >
> > > > > > > A new CatalogProvider interface can be added:
> > > > > > > It contains two interfaces:
> > > > > > > * listCatalogs() interface, which can list all the interfaces
> > that
> > > the
> > > > > > > interface can provide
> > > > > > > * getCatalog() interface,  which can get a catalog instance by
> > > catalog
> > > > > > name.
> > > > > > >
> > > > > > > ```java
> > > > > > > public interface CatalogProvider {
> > > > > > >
> > > > > > >      default void initialize(ClassLoader classLoader,
> > > ReadableConfig
> > > > > > config) {}
> > > > > > >
> > > > > > >      Optional<Catalog> getCatalog(String catalogName);
> > > > > > >
> > > > > > >      Set<String> listCatalogs();
> > > > > > > }
> > > > > > > ```
> > > > > > >
> > > > > > >
> > > > > > > The corresponding implementation in CatalogManager is as follows:
> > > > > > >
> > > > > > > ```java
> > > > > > > public CatalogManager {
> > > > > > >      private @Nullable CatalogProvider catalogProvider;
> > > > > > >
> > > > > > >      private Map<String, Catalog> catalogs;
> > > > > > >
> > > > > > >      public void setCatalogProvider(CatalogProvider
> > > catalogProvider) {
> > > > > > >          this.catalogProvider = catalogProvider;
> > > > > > >      }
> > > > > > >
> > > > > > >      public Optional<Catalog> getCatalog(String catalogName) {
> > > > > > >          // If there is no corresponding catalog in catalogs,
> > > > > > >          // get catalog by catalogProvider
> > > > > > >          if (catalogProvider != null) {
> > > > > > >              Optional<Catalog> catalog =
> > > > > > catalogProvider.getCatalog(catalogName);
> > > > > > >          }
> > > > > > >      }
> > > > > > >
> > > > > > > }
> > > > > > > ```
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Possible problems:
> > > > > > >
> > > > > > > 1. Catalog name conflict, how to choose when the registered
> > catalog
> > > > > > > and the catalog provided by catalog-provider conflict?
> > > > > > > I prefer tableEnv-registered ones over catalogs provided by the
> > > > > > > catalog-provider. If the user wishes to reference the catalog
> > > provided
> > > > > > > by the catalog-provider, they can unregister the catalog in
> > > tableEnv
> > > > > > > through the `unregisterCatalog` interface.
> > > > > > >
> > > > > > > 2. Number of CatalogProviders, is it possible to have multiple
> > > > > > > catalogProvider implementations?
> > > > > > > I don't have a good idea of this at the moment. If multiple
> > > > > > > catalogProviders are supported, it brings much more convenience,
> > > But
> > > > > > > there may be catalog name conflicts between different
> > > > > > > catalogProviders.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Looking forward to your reply, any feedback is appreciated!
> > > > > > >
> > > > > > >
> > > > > > > Best.
> > > > > > >
> > > > > > > Feng Jin
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> >

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Jark Wu <im...@gmail.com>.
Hi Feng,

It's still easy to conflict and be inconsistent even if we have only one
CatalogProvider, because CatalogProvider only provides readable interfaces
(listCatalogs, getCatalog). For example, you may register a catalog X, but
can't list it because it's not in the external metadata service.

To avoid catalog conflicts and keep consistent, we can extract the catalog
management logic as a pluggable interface, including listCatalog,
getCatalog, registerCatalog, unregisterCatalog, etc. The
current CatalogManager is a default in-memory implementation, you can
replace it with user-defined managers, such as
 - file-based: which manages catalog information on local files, just like
how Presto/Trino manages catalogs
 - metaservice-based: which manages catalog information on external
metadata service.

There only can be a single catalog manager in TableEnvironment. This
guarantees data consistency and avoids conflicts. This approach can address
another pain point of Flink SQL: the catalog information is not persisted.

Can this approach satisfy your requirements?

Best,
Jark





On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fs...@gmail.com> wrote:

> Hi Feng.
>
> I think your idea is very interesting!
>
> 1. I just wonder after initializing the Catalog, will the Session reuse the
> same Catalog instance or build a new one for later usage? If we reuse the
> same Catalog, I think it's more like lazy initialization. I am a
> little prone to rebuild a new one because it's easier for us to catalog jar
> hot updates.
>
> 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
> case, do we need to instantiate the Catalog immediately or defer to the
> usage?
>
> Best,
> Shengkai
>
> Feng Jin <ji...@gmail.com> 于2023年2月9日周四 20:13写道:
>
> > Thanks for your reply.
> >
> > @Timo
> >
> > >  2) avoid  the default in-memory catalog and offer their catalog before
> > a  TableEnvironment session starts
> > >  3) whether this can be disabled and SHOW CATALOGS  can be used for
> > listing first without having a default catalog.
> >
> >
> > Regarding 2 and 3, I think this problem can be solved by introducing
> > catalog providers, and users can control some default catalog
> > behavior.
> >
> >
> > > We could also use the org.apache.flink.table.factories.Factory infra
> > and  allow catalog providers via pure string properties
> >
> > I think this is also very useful. In our usage scenarios, it is
> > usually multi-cluster management, and it is also necessary to pass
> > different configurations through parameters.
> >
> >
> > @Jark @Huang
> >
> > >  About the lazy catalog initialization
> >
> > Our needs may be different. If these properties already exist in an
> > external system, especially when there may be thousands of these
> > catalog properties, I don’t think it is necessary to register all
> > these properties in the Flink env at startup, but we need is that we
> > can register a catalog  when it needs and we can get the properties
> > from the external meta system .
> >
> >
> > >  It may be hard to avoid conflicts  and duplicates between
> > CatalogProvider and CatalogManager
> >
> > It is indeed easy to conflict. My idea is that if we separate the
> > catalog management of the current CatalogManager as the default
> > CatalogProvider behavior, at the same time, only one CatalogProvider
> > exists in a Flink Env.  This may avoid catalog conflicts.
> >
> >
> > Best,
> > Feng
> >
> > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ru...@gmail.com> wrote:
> > >
> > > Hi Feng,
> > > I agree with what Jark said. I think what you are looking for is lazy
> > > initialization.
> > >
> > > I don't think we should introduce the new interface CatalogProvider for
> > > lazy initialization. What we should do is to store the catalog
> properties
> > > and initialize the catalog when we need it. Could you please introduce
> > some
> > > other scenarios that we need the CatalogProvider besides the lazy
> > > initialization?
> > >
> > > If we really need the CatalogProvider, I think it is better to be a
> > single
> > > instance. Multiple instances are difficult to manage and there are name
> > > conflicts among providers.
> > >
> > > Best,
> > > Hang
> > >
> > > Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:
> > >
> > > > Hi Feng,
> > > >
> > > > I think this feature makes a lot of sense. If I understand correctly,
> > what
> > > > you are looking for is lazy catalog initialization.
> > > >
> > > > However, I have some concerns about introducing CatalogProvider,
> which
> > > > delegates catalog management to users. It may be hard to avoid
> > conflicts
> > > > and duplicates between CatalogProvider and CatalogManager. Is it
> > possible
> > > > to have a built-in CatalogProvider to instantiate catalogs lazily?
> > > >
> > > > An idea in my mind is to introduce another catalog registration API
> > > > without instantiating the catalog, e.g., registerCatalog(String
> > > > catalogName, Map<String, String> catalogProperties). The catalog
> > > > information is stored in CatalogManager as pure strings. The catalog
> is
> > > > instantiated and initialized when used.
> > > >
> > > > This new API is very similar to other pure-string metadata
> > registration,
> > > > such as "createTable(String path, TableDescriptor descriptor)" and
> > > > "createFunction(String path, String className, List<ResourceUri>
> > > > resourceUris)".
> > > >
> > > > Can this approach satisfy your requirement?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org>
> wrote:
> > > >
> > > > > Hi Feng,
> > > > >
> > > > > this is indeed a good proposal.
> > > > >
> > > > > 1) It makes sense to improve the catalog listing for platform
> > providers.
> > > > >
> > > > > 2) Other feedback from the past has shown that users would like to
> > avoid
> > > > > the default in-memory catalog and offer their catalog before a
> > > > > TableEnvironment session starts.
> > > > >
> > > > > 3) Also we might reconsider whether a default catalog and default
> > > > > database make sense. Or whether this can be disabled and SHOW
> > CATALOGS
> > > > > can be used for listing first without having a default catalog.
> > > > >
> > > > > What do you think about option 2 and 3?
> > > > >
> > > > > In any case, I would propose we pass a CatalogProvider to
> > > > > EnvironmentSettings and only allow a single instance. Catalogs
> should
> > > > > never shadow other catalogs.
> > > > >
> > > > > We could also use the org.apache.flink.table.factories.Factory
> infra
> > and
> > > > > allow catalog providers via pure string properties. Not sure if we
> > need
> > > > > this in the first version though.
> > > > >
> > > > > Cheers,
> > > > > Timo
> > > > >
> > > > >
> > > > > On 06.02.23 11:21, Feng Jin wrote:
> > > > > > Hi everyone,
> > > > > >
> > > > > > The original discussion address is
> > > > > > https://issues.apache.org/jira/browse/FLINK-30126
> > > > > >
> > > > > > Currently, Flink has access to many systems, including kafka,
> hive,
> > > > > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
> > name
> > > > > > might be:
> > > > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > > > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > > > > mysql_database2_xxxx
> > > > > >
> > > > > > As the platform of the Flink SQL job, we need to maintain the
> meta
> > > > > > information of each system of the company, and when the Flink job
> > > > > > starts, we need to register the catalog with the Flink table
> > > > > > environment, so that users can use any table through the
> > > > > > env.executeSql interface.
> > > > > >
> > > > > > When we only have a small number of catalogs, we can register
> like
> > > > > > this, but when there are thousands of catalogs, I think that
> there
> > > > > > needs to be a dynamic loading mechanism that we can register
> > catalog
> > > > > > when needed, speed up the initialization of the table
> environment,
> > and
> > > > > > avoid the useless catalog registration process.
> > > > > >
> > > > > > Preliminary thoughts:
> > > > > >
> > > > > > A new CatalogProvider interface can be added:
> > > > > > It contains two interfaces:
> > > > > > * listCatalogs() interface, which can list all the interfaces
> that
> > the
> > > > > > interface can provide
> > > > > > * getCatalog() interface,  which can get a catalog instance by
> > catalog
> > > > > name.
> > > > > >
> > > > > > ```java
> > > > > > public interface CatalogProvider {
> > > > > >
> > > > > >      default void initialize(ClassLoader classLoader,
> > ReadableConfig
> > > > > config) {}
> > > > > >
> > > > > >      Optional<Catalog> getCatalog(String catalogName);
> > > > > >
> > > > > >      Set<String> listCatalogs();
> > > > > > }
> > > > > > ```
> > > > > >
> > > > > >
> > > > > > The corresponding implementation in CatalogManager is as follows:
> > > > > >
> > > > > > ```java
> > > > > > public CatalogManager {
> > > > > >      private @Nullable CatalogProvider catalogProvider;
> > > > > >
> > > > > >      private Map<String, Catalog> catalogs;
> > > > > >
> > > > > >      public void setCatalogProvider(CatalogProvider
> > catalogProvider) {
> > > > > >          this.catalogProvider = catalogProvider;
> > > > > >      }
> > > > > >
> > > > > >      public Optional<Catalog> getCatalog(String catalogName) {
> > > > > >          // If there is no corresponding catalog in catalogs,
> > > > > >          // get catalog by catalogProvider
> > > > > >          if (catalogProvider != null) {
> > > > > >              Optional<Catalog> catalog =
> > > > > catalogProvider.getCatalog(catalogName);
> > > > > >          }
> > > > > >      }
> > > > > >
> > > > > > }
> > > > > > ```
> > > > > >
> > > > > >
> > > > > >
> > > > > > Possible problems:
> > > > > >
> > > > > > 1. Catalog name conflict, how to choose when the registered
> catalog
> > > > > > and the catalog provided by catalog-provider conflict?
> > > > > > I prefer tableEnv-registered ones over catalogs provided by the
> > > > > > catalog-provider. If the user wishes to reference the catalog
> > provided
> > > > > > by the catalog-provider, they can unregister the catalog in
> > tableEnv
> > > > > > through the `unregisterCatalog` interface.
> > > > > >
> > > > > > 2. Number of CatalogProviders, is it possible to have multiple
> > > > > > catalogProvider implementations?
> > > > > > I don't have a good idea of this at the moment. If multiple
> > > > > > catalogProviders are supported, it brings much more convenience,
> > But
> > > > > > there may be catalog name conflicts between different
> > > > > > catalogProviders.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Looking forward to your reply, any feedback is appreciated!
> > > > > >
> > > > > >
> > > > > > Best.
> > > > > >
> > > > > > Feng Jin
> > > > > >
> > > > >
> > > > >
> > > >
> >
>

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Shengkai Fang <fs...@gmail.com>.
Hi Feng.

I think your idea is very interesting!

1. I just wonder after initializing the Catalog, will the Session reuse the
same Catalog instance or build a new one for later usage? If we reuse the
same Catalog, I think it's more like lazy initialization. I am a
little prone to rebuild a new one because it's easier for us to catalog jar
hot updates.

2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
case, do we need to instantiate the Catalog immediately or defer to the
usage?

Best,
Shengkai

Feng Jin <ji...@gmail.com> 于2023年2月9日周四 20:13写道:

> Thanks for your reply.
>
> @Timo
>
> >  2) avoid  the default in-memory catalog and offer their catalog before
> a  TableEnvironment session starts
> >  3) whether this can be disabled and SHOW CATALOGS  can be used for
> listing first without having a default catalog.
>
>
> Regarding 2 and 3, I think this problem can be solved by introducing
> catalog providers, and users can control some default catalog
> behavior.
>
>
> > We could also use the org.apache.flink.table.factories.Factory infra
> and  allow catalog providers via pure string properties
>
> I think this is also very useful. In our usage scenarios, it is
> usually multi-cluster management, and it is also necessary to pass
> different configurations through parameters.
>
>
> @Jark @Huang
>
> >  About the lazy catalog initialization
>
> Our needs may be different. If these properties already exist in an
> external system, especially when there may be thousands of these
> catalog properties, I don’t think it is necessary to register all
> these properties in the Flink env at startup, but we need is that we
> can register a catalog  when it needs and we can get the properties
> from the external meta system .
>
>
> >  It may be hard to avoid conflicts  and duplicates between
> CatalogProvider and CatalogManager
>
> It is indeed easy to conflict. My idea is that if we separate the
> catalog management of the current CatalogManager as the default
> CatalogProvider behavior, at the same time, only one CatalogProvider
> exists in a Flink Env.  This may avoid catalog conflicts.
>
>
> Best,
> Feng
>
> On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ru...@gmail.com> wrote:
> >
> > Hi Feng,
> > I agree with what Jark said. I think what you are looking for is lazy
> > initialization.
> >
> > I don't think we should introduce the new interface CatalogProvider for
> > lazy initialization. What we should do is to store the catalog properties
> > and initialize the catalog when we need it. Could you please introduce
> some
> > other scenarios that we need the CatalogProvider besides the lazy
> > initialization?
> >
> > If we really need the CatalogProvider, I think it is better to be a
> single
> > instance. Multiple instances are difficult to manage and there are name
> > conflicts among providers.
> >
> > Best,
> > Hang
> >
> > Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:
> >
> > > Hi Feng,
> > >
> > > I think this feature makes a lot of sense. If I understand correctly,
> what
> > > you are looking for is lazy catalog initialization.
> > >
> > > However, I have some concerns about introducing CatalogProvider, which
> > > delegates catalog management to users. It may be hard to avoid
> conflicts
> > > and duplicates between CatalogProvider and CatalogManager. Is it
> possible
> > > to have a built-in CatalogProvider to instantiate catalogs lazily?
> > >
> > > An idea in my mind is to introduce another catalog registration API
> > > without instantiating the catalog, e.g., registerCatalog(String
> > > catalogName, Map<String, String> catalogProperties). The catalog
> > > information is stored in CatalogManager as pure strings. The catalog is
> > > instantiated and initialized when used.
> > >
> > > This new API is very similar to other pure-string metadata
> registration,
> > > such as "createTable(String path, TableDescriptor descriptor)" and
> > > "createFunction(String path, String className, List<ResourceUri>
> > > resourceUris)".
> > >
> > > Can this approach satisfy your requirement?
> > >
> > > Best,
> > > Jark
> > >
> > > On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org> wrote:
> > >
> > > > Hi Feng,
> > > >
> > > > this is indeed a good proposal.
> > > >
> > > > 1) It makes sense to improve the catalog listing for platform
> providers.
> > > >
> > > > 2) Other feedback from the past has shown that users would like to
> avoid
> > > > the default in-memory catalog and offer their catalog before a
> > > > TableEnvironment session starts.
> > > >
> > > > 3) Also we might reconsider whether a default catalog and default
> > > > database make sense. Or whether this can be disabled and SHOW
> CATALOGS
> > > > can be used for listing first without having a default catalog.
> > > >
> > > > What do you think about option 2 and 3?
> > > >
> > > > In any case, I would propose we pass a CatalogProvider to
> > > > EnvironmentSettings and only allow a single instance. Catalogs should
> > > > never shadow other catalogs.
> > > >
> > > > We could also use the org.apache.flink.table.factories.Factory infra
> and
> > > > allow catalog providers via pure string properties. Not sure if we
> need
> > > > this in the first version though.
> > > >
> > > > Cheers,
> > > > Timo
> > > >
> > > >
> > > > On 06.02.23 11:21, Feng Jin wrote:
> > > > > Hi everyone,
> > > > >
> > > > > The original discussion address is
> > > > > https://issues.apache.org/jira/browse/FLINK-30126
> > > > >
> > > > > Currently, Flink has access to many systems, including kafka, hive,
> > > > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
> name
> > > > > might be:
> > > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > > > mysql_database2_xxxx
> > > > >
> > > > > As the platform of the Flink SQL job, we need to maintain the meta
> > > > > information of each system of the company, and when the Flink job
> > > > > starts, we need to register the catalog with the Flink table
> > > > > environment, so that users can use any table through the
> > > > > env.executeSql interface.
> > > > >
> > > > > When we only have a small number of catalogs, we can register like
> > > > > this, but when there are thousands of catalogs, I think that there
> > > > > needs to be a dynamic loading mechanism that we can register
> catalog
> > > > > when needed, speed up the initialization of the table environment,
> and
> > > > > avoid the useless catalog registration process.
> > > > >
> > > > > Preliminary thoughts:
> > > > >
> > > > > A new CatalogProvider interface can be added:
> > > > > It contains two interfaces:
> > > > > * listCatalogs() interface, which can list all the interfaces that
> the
> > > > > interface can provide
> > > > > * getCatalog() interface,  which can get a catalog instance by
> catalog
> > > > name.
> > > > >
> > > > > ```java
> > > > > public interface CatalogProvider {
> > > > >
> > > > >      default void initialize(ClassLoader classLoader,
> ReadableConfig
> > > > config) {}
> > > > >
> > > > >      Optional<Catalog> getCatalog(String catalogName);
> > > > >
> > > > >      Set<String> listCatalogs();
> > > > > }
> > > > > ```
> > > > >
> > > > >
> > > > > The corresponding implementation in CatalogManager is as follows:
> > > > >
> > > > > ```java
> > > > > public CatalogManager {
> > > > >      private @Nullable CatalogProvider catalogProvider;
> > > > >
> > > > >      private Map<String, Catalog> catalogs;
> > > > >
> > > > >      public void setCatalogProvider(CatalogProvider
> catalogProvider) {
> > > > >          this.catalogProvider = catalogProvider;
> > > > >      }
> > > > >
> > > > >      public Optional<Catalog> getCatalog(String catalogName) {
> > > > >          // If there is no corresponding catalog in catalogs,
> > > > >          // get catalog by catalogProvider
> > > > >          if (catalogProvider != null) {
> > > > >              Optional<Catalog> catalog =
> > > > catalogProvider.getCatalog(catalogName);
> > > > >          }
> > > > >      }
> > > > >
> > > > > }
> > > > > ```
> > > > >
> > > > >
> > > > >
> > > > > Possible problems:
> > > > >
> > > > > 1. Catalog name conflict, how to choose when the registered catalog
> > > > > and the catalog provided by catalog-provider conflict?
> > > > > I prefer tableEnv-registered ones over catalogs provided by the
> > > > > catalog-provider. If the user wishes to reference the catalog
> provided
> > > > > by the catalog-provider, they can unregister the catalog in
> tableEnv
> > > > > through the `unregisterCatalog` interface.
> > > > >
> > > > > 2. Number of CatalogProviders, is it possible to have multiple
> > > > > catalogProvider implementations?
> > > > > I don't have a good idea of this at the moment. If multiple
> > > > > catalogProviders are supported, it brings much more convenience,
> But
> > > > > there may be catalog name conflicts between different
> > > > > catalogProviders.
> > > > >
> > > > >
> > > > >
> > > > > Looking forward to your reply, any feedback is appreciated!
> > > > >
> > > > >
> > > > > Best.
> > > > >
> > > > > Feng Jin
> > > > >
> > > >
> > > >
> > >
>

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Feng Jin <ji...@gmail.com>.
Thanks for your reply.

@Timo

>  2) avoid  the default in-memory catalog and offer their catalog before a  TableEnvironment session starts
>  3) whether this can be disabled and SHOW CATALOGS  can be used for listing first without having a default catalog.


Regarding 2 and 3, I think this problem can be solved by introducing
catalog providers, and users can control some default catalog
behavior.


> We could also use the org.apache.flink.table.factories.Factory infra and  allow catalog providers via pure string properties

I think this is also very useful. In our usage scenarios, it is
usually multi-cluster management, and it is also necessary to pass
different configurations through parameters.


@Jark @Huang

>  About the lazy catalog initialization

Our needs may be different. If these properties already exist in an
external system, especially when there may be thousands of these
catalog properties, I don’t think it is necessary to register all
these properties in the Flink env at startup, but we need is that we
can register a catalog  when it needs and we can get the properties
from the external meta system .


>  It may be hard to avoid conflicts  and duplicates between CatalogProvider and CatalogManager

It is indeed easy to conflict. My idea is that if we separate the
catalog management of the current CatalogManager as the default
CatalogProvider behavior, at the same time, only one CatalogProvider
exists in a Flink Env.  This may avoid catalog conflicts.


Best,
Feng

On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ru...@gmail.com> wrote:
>
> Hi Feng,
> I agree with what Jark said. I think what you are looking for is lazy
> initialization.
>
> I don't think we should introduce the new interface CatalogProvider for
> lazy initialization. What we should do is to store the catalog properties
> and initialize the catalog when we need it. Could you please introduce some
> other scenarios that we need the CatalogProvider besides the lazy
> initialization?
>
> If we really need the CatalogProvider, I think it is better to be a single
> instance. Multiple instances are difficult to manage and there are name
> conflicts among providers.
>
> Best,
> Hang
>
> Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:
>
> > Hi Feng,
> >
> > I think this feature makes a lot of sense. If I understand correctly, what
> > you are looking for is lazy catalog initialization.
> >
> > However, I have some concerns about introducing CatalogProvider, which
> > delegates catalog management to users. It may be hard to avoid conflicts
> > and duplicates between CatalogProvider and CatalogManager. Is it possible
> > to have a built-in CatalogProvider to instantiate catalogs lazily?
> >
> > An idea in my mind is to introduce another catalog registration API
> > without instantiating the catalog, e.g., registerCatalog(String
> > catalogName, Map<String, String> catalogProperties). The catalog
> > information is stored in CatalogManager as pure strings. The catalog is
> > instantiated and initialized when used.
> >
> > This new API is very similar to other pure-string metadata registration,
> > such as "createTable(String path, TableDescriptor descriptor)" and
> > "createFunction(String path, String className, List<ResourceUri>
> > resourceUris)".
> >
> > Can this approach satisfy your requirement?
> >
> > Best,
> > Jark
> >
> > On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org> wrote:
> >
> > > Hi Feng,
> > >
> > > this is indeed a good proposal.
> > >
> > > 1) It makes sense to improve the catalog listing for platform providers.
> > >
> > > 2) Other feedback from the past has shown that users would like to avoid
> > > the default in-memory catalog and offer their catalog before a
> > > TableEnvironment session starts.
> > >
> > > 3) Also we might reconsider whether a default catalog and default
> > > database make sense. Or whether this can be disabled and SHOW CATALOGS
> > > can be used for listing first without having a default catalog.
> > >
> > > What do you think about option 2 and 3?
> > >
> > > In any case, I would propose we pass a CatalogProvider to
> > > EnvironmentSettings and only allow a single instance. Catalogs should
> > > never shadow other catalogs.
> > >
> > > We could also use the org.apache.flink.table.factories.Factory infra and
> > > allow catalog providers via pure string properties. Not sure if we need
> > > this in the first version though.
> > >
> > > Cheers,
> > > Timo
> > >
> > >
> > > On 06.02.23 11:21, Feng Jin wrote:
> > > > Hi everyone,
> > > >
> > > > The original discussion address is
> > > > https://issues.apache.org/jira/browse/FLINK-30126
> > > >
> > > > Currently, Flink has access to many systems, including kafka, hive,
> > > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
> > > > might be:
> > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > > mysql_database2_xxxx
> > > >
> > > > As the platform of the Flink SQL job, we need to maintain the meta
> > > > information of each system of the company, and when the Flink job
> > > > starts, we need to register the catalog with the Flink table
> > > > environment, so that users can use any table through the
> > > > env.executeSql interface.
> > > >
> > > > When we only have a small number of catalogs, we can register like
> > > > this, but when there are thousands of catalogs, I think that there
> > > > needs to be a dynamic loading mechanism that we can register catalog
> > > > when needed, speed up the initialization of the table environment, and
> > > > avoid the useless catalog registration process.
> > > >
> > > > Preliminary thoughts:
> > > >
> > > > A new CatalogProvider interface can be added:
> > > > It contains two interfaces:
> > > > * listCatalogs() interface, which can list all the interfaces that the
> > > > interface can provide
> > > > * getCatalog() interface,  which can get a catalog instance by catalog
> > > name.
> > > >
> > > > ```java
> > > > public interface CatalogProvider {
> > > >
> > > >      default void initialize(ClassLoader classLoader, ReadableConfig
> > > config) {}
> > > >
> > > >      Optional<Catalog> getCatalog(String catalogName);
> > > >
> > > >      Set<String> listCatalogs();
> > > > }
> > > > ```
> > > >
> > > >
> > > > The corresponding implementation in CatalogManager is as follows:
> > > >
> > > > ```java
> > > > public CatalogManager {
> > > >      private @Nullable CatalogProvider catalogProvider;
> > > >
> > > >      private Map<String, Catalog> catalogs;
> > > >
> > > >      public void setCatalogProvider(CatalogProvider catalogProvider) {
> > > >          this.catalogProvider = catalogProvider;
> > > >      }
> > > >
> > > >      public Optional<Catalog> getCatalog(String catalogName) {
> > > >          // If there is no corresponding catalog in catalogs,
> > > >          // get catalog by catalogProvider
> > > >          if (catalogProvider != null) {
> > > >              Optional<Catalog> catalog =
> > > catalogProvider.getCatalog(catalogName);
> > > >          }
> > > >      }
> > > >
> > > > }
> > > > ```
> > > >
> > > >
> > > >
> > > > Possible problems:
> > > >
> > > > 1. Catalog name conflict, how to choose when the registered catalog
> > > > and the catalog provided by catalog-provider conflict?
> > > > I prefer tableEnv-registered ones over catalogs provided by the
> > > > catalog-provider. If the user wishes to reference the catalog provided
> > > > by the catalog-provider, they can unregister the catalog in tableEnv
> > > > through the `unregisterCatalog` interface.
> > > >
> > > > 2. Number of CatalogProviders, is it possible to have multiple
> > > > catalogProvider implementations?
> > > > I don't have a good idea of this at the moment. If multiple
> > > > catalogProviders are supported, it brings much more convenience, But
> > > > there may be catalog name conflicts between different
> > > > catalogProviders.
> > > >
> > > >
> > > >
> > > > Looking forward to your reply, any feedback is appreciated!
> > > >
> > > >
> > > > Best.
> > > >
> > > > Feng Jin
> > > >
> > >
> > >
> >

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Hang Ruan <ru...@gmail.com>.
Hi Feng,
I agree with what Jark said. I think what you are looking for is lazy
initialization.

I don't think we should introduce the new interface CatalogProvider for
lazy initialization. What we should do is to store the catalog properties
and initialize the catalog when we need it. Could you please introduce some
other scenarios that we need the CatalogProvider besides the lazy
initialization?

If we really need the CatalogProvider, I think it is better to be a single
instance. Multiple instances are difficult to manage and there are name
conflicts among providers.

Best,
Hang

Jark Wu <im...@gmail.com> 于2023年2月7日周二 10:48写道:

> Hi Feng,
>
> I think this feature makes a lot of sense. If I understand correctly, what
> you are looking for is lazy catalog initialization.
>
> However, I have some concerns about introducing CatalogProvider, which
> delegates catalog management to users. It may be hard to avoid conflicts
> and duplicates between CatalogProvider and CatalogManager. Is it possible
> to have a built-in CatalogProvider to instantiate catalogs lazily?
>
> An idea in my mind is to introduce another catalog registration API
> without instantiating the catalog, e.g., registerCatalog(String
> catalogName, Map<String, String> catalogProperties). The catalog
> information is stored in CatalogManager as pure strings. The catalog is
> instantiated and initialized when used.
>
> This new API is very similar to other pure-string metadata registration,
> such as "createTable(String path, TableDescriptor descriptor)" and
> "createFunction(String path, String className, List<ResourceUri>
> resourceUris)".
>
> Can this approach satisfy your requirement?
>
> Best,
> Jark
>
> On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org> wrote:
>
> > Hi Feng,
> >
> > this is indeed a good proposal.
> >
> > 1) It makes sense to improve the catalog listing for platform providers.
> >
> > 2) Other feedback from the past has shown that users would like to avoid
> > the default in-memory catalog and offer their catalog before a
> > TableEnvironment session starts.
> >
> > 3) Also we might reconsider whether a default catalog and default
> > database make sense. Or whether this can be disabled and SHOW CATALOGS
> > can be used for listing first without having a default catalog.
> >
> > What do you think about option 2 and 3?
> >
> > In any case, I would propose we pass a CatalogProvider to
> > EnvironmentSettings and only allow a single instance. Catalogs should
> > never shadow other catalogs.
> >
> > We could also use the org.apache.flink.table.factories.Factory infra and
> > allow catalog providers via pure string properties. Not sure if we need
> > this in the first version though.
> >
> > Cheers,
> > Timo
> >
> >
> > On 06.02.23 11:21, Feng Jin wrote:
> > > Hi everyone,
> > >
> > > The original discussion address is
> > > https://issues.apache.org/jira/browse/FLINK-30126
> > >
> > > Currently, Flink has access to many systems, including kafka, hive,
> > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
> > > might be:
> > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > mysql_database2_xxxx
> > >
> > > As the platform of the Flink SQL job, we need to maintain the meta
> > > information of each system of the company, and when the Flink job
> > > starts, we need to register the catalog with the Flink table
> > > environment, so that users can use any table through the
> > > env.executeSql interface.
> > >
> > > When we only have a small number of catalogs, we can register like
> > > this, but when there are thousands of catalogs, I think that there
> > > needs to be a dynamic loading mechanism that we can register catalog
> > > when needed, speed up the initialization of the table environment, and
> > > avoid the useless catalog registration process.
> > >
> > > Preliminary thoughts:
> > >
> > > A new CatalogProvider interface can be added:
> > > It contains two interfaces:
> > > * listCatalogs() interface, which can list all the interfaces that the
> > > interface can provide
> > > * getCatalog() interface,  which can get a catalog instance by catalog
> > name.
> > >
> > > ```java
> > > public interface CatalogProvider {
> > >
> > >      default void initialize(ClassLoader classLoader, ReadableConfig
> > config) {}
> > >
> > >      Optional<Catalog> getCatalog(String catalogName);
> > >
> > >      Set<String> listCatalogs();
> > > }
> > > ```
> > >
> > >
> > > The corresponding implementation in CatalogManager is as follows:
> > >
> > > ```java
> > > public CatalogManager {
> > >      private @Nullable CatalogProvider catalogProvider;
> > >
> > >      private Map<String, Catalog> catalogs;
> > >
> > >      public void setCatalogProvider(CatalogProvider catalogProvider) {
> > >          this.catalogProvider = catalogProvider;
> > >      }
> > >
> > >      public Optional<Catalog> getCatalog(String catalogName) {
> > >          // If there is no corresponding catalog in catalogs,
> > >          // get catalog by catalogProvider
> > >          if (catalogProvider != null) {
> > >              Optional<Catalog> catalog =
> > catalogProvider.getCatalog(catalogName);
> > >          }
> > >      }
> > >
> > > }
> > > ```
> > >
> > >
> > >
> > > Possible problems:
> > >
> > > 1. Catalog name conflict, how to choose when the registered catalog
> > > and the catalog provided by catalog-provider conflict?
> > > I prefer tableEnv-registered ones over catalogs provided by the
> > > catalog-provider. If the user wishes to reference the catalog provided
> > > by the catalog-provider, they can unregister the catalog in tableEnv
> > > through the `unregisterCatalog` interface.
> > >
> > > 2. Number of CatalogProviders, is it possible to have multiple
> > > catalogProvider implementations?
> > > I don't have a good idea of this at the moment. If multiple
> > > catalogProviders are supported, it brings much more convenience, But
> > > there may be catalog name conflicts between different
> > > catalogProviders.
> > >
> > >
> > >
> > > Looking forward to your reply, any feedback is appreciated!
> > >
> > >
> > > Best.
> > >
> > > Feng Jin
> > >
> >
> >
>

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Jark Wu <im...@gmail.com>.
Hi Feng,

I think this feature makes a lot of sense. If I understand correctly, what
you are looking for is lazy catalog initialization.

However, I have some concerns about introducing CatalogProvider, which
delegates catalog management to users. It may be hard to avoid conflicts
and duplicates between CatalogProvider and CatalogManager. Is it possible
to have a built-in CatalogProvider to instantiate catalogs lazily?

An idea in my mind is to introduce another catalog registration API
without instantiating the catalog, e.g., registerCatalog(String
catalogName, Map<String, String> catalogProperties). The catalog
information is stored in CatalogManager as pure strings. The catalog is
instantiated and initialized when used.

This new API is very similar to other pure-string metadata registration,
such as "createTable(String path, TableDescriptor descriptor)" and
"createFunction(String path, String className, List<ResourceUri>
resourceUris)".

Can this approach satisfy your requirement?

Best,
Jark

On Mon, 6 Feb 2023 at 22:53, Timo Walther <tw...@apache.org> wrote:

> Hi Feng,
>
> this is indeed a good proposal.
>
> 1) It makes sense to improve the catalog listing for platform providers.
>
> 2) Other feedback from the past has shown that users would like to avoid
> the default in-memory catalog and offer their catalog before a
> TableEnvironment session starts.
>
> 3) Also we might reconsider whether a default catalog and default
> database make sense. Or whether this can be disabled and SHOW CATALOGS
> can be used for listing first without having a default catalog.
>
> What do you think about option 2 and 3?
>
> In any case, I would propose we pass a CatalogProvider to
> EnvironmentSettings and only allow a single instance. Catalogs should
> never shadow other catalogs.
>
> We could also use the org.apache.flink.table.factories.Factory infra and
> allow catalog providers via pure string properties. Not sure if we need
> this in the first version though.
>
> Cheers,
> Timo
>
>
> On 06.02.23 11:21, Feng Jin wrote:
> > Hi everyone,
> >
> > The original discussion address is
> > https://issues.apache.org/jira/browse/FLINK-30126
> >
> > Currently, Flink has access to many systems, including kafka, hive,
> > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
> > might be:
> > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > mysql_database2_xxxx
> >
> > As the platform of the Flink SQL job, we need to maintain the meta
> > information of each system of the company, and when the Flink job
> > starts, we need to register the catalog with the Flink table
> > environment, so that users can use any table through the
> > env.executeSql interface.
> >
> > When we only have a small number of catalogs, we can register like
> > this, but when there are thousands of catalogs, I think that there
> > needs to be a dynamic loading mechanism that we can register catalog
> > when needed, speed up the initialization of the table environment, and
> > avoid the useless catalog registration process.
> >
> > Preliminary thoughts:
> >
> > A new CatalogProvider interface can be added:
> > It contains two interfaces:
> > * listCatalogs() interface, which can list all the interfaces that the
> > interface can provide
> > * getCatalog() interface,  which can get a catalog instance by catalog
> name.
> >
> > ```java
> > public interface CatalogProvider {
> >
> >      default void initialize(ClassLoader classLoader, ReadableConfig
> config) {}
> >
> >      Optional<Catalog> getCatalog(String catalogName);
> >
> >      Set<String> listCatalogs();
> > }
> > ```
> >
> >
> > The corresponding implementation in CatalogManager is as follows:
> >
> > ```java
> > public CatalogManager {
> >      private @Nullable CatalogProvider catalogProvider;
> >
> >      private Map<String, Catalog> catalogs;
> >
> >      public void setCatalogProvider(CatalogProvider catalogProvider) {
> >          this.catalogProvider = catalogProvider;
> >      }
> >
> >      public Optional<Catalog> getCatalog(String catalogName) {
> >          // If there is no corresponding catalog in catalogs,
> >          // get catalog by catalogProvider
> >          if (catalogProvider != null) {
> >              Optional<Catalog> catalog =
> catalogProvider.getCatalog(catalogName);
> >          }
> >      }
> >
> > }
> > ```
> >
> >
> >
> > Possible problems:
> >
> > 1. Catalog name conflict, how to choose when the registered catalog
> > and the catalog provided by catalog-provider conflict?
> > I prefer tableEnv-registered ones over catalogs provided by the
> > catalog-provider. If the user wishes to reference the catalog provided
> > by the catalog-provider, they can unregister the catalog in tableEnv
> > through the `unregisterCatalog` interface.
> >
> > 2. Number of CatalogProviders, is it possible to have multiple
> > catalogProvider implementations?
> > I don't have a good idea of this at the moment. If multiple
> > catalogProviders are supported, it brings much more convenience, But
> > there may be catalog name conflicts between different
> > catalogProviders.
> >
> >
> >
> > Looking forward to your reply, any feedback is appreciated!
> >
> >
> > Best.
> >
> > Feng Jin
> >
>
>

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

Posted by Timo Walther <tw...@apache.org>.
Hi Feng,

this is indeed a good proposal.

1) It makes sense to improve the catalog listing for platform providers.

2) Other feedback from the past has shown that users would like to avoid 
the default in-memory catalog and offer their catalog before a 
TableEnvironment session starts.

3) Also we might reconsider whether a default catalog and default 
database make sense. Or whether this can be disabled and SHOW CATALOGS 
can be used for listing first without having a default catalog.

What do you think about option 2 and 3?

In any case, I would propose we pass a CatalogProvider to 
EnvironmentSettings and only allow a single instance. Catalogs should 
never shadow other catalogs.

We could also use the org.apache.flink.table.factories.Factory infra and 
allow catalog providers via pure string properties. Not sure if we need 
this in the first version though.

Cheers,
Timo


On 06.02.23 11:21, Feng Jin wrote:
> Hi everyone,
> 
> The original discussion address is
> https://issues.apache.org/jira/browse/FLINK-30126
> 
> Currently, Flink has access to many systems, including kafka, hive,
> iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
> might be:
> kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> mysql_database2_xxxx
> 
> As the platform of the Flink SQL job, we need to maintain the meta
> information of each system of the company, and when the Flink job
> starts, we need to register the catalog with the Flink table
> environment, so that users can use any table through the
> env.executeSql interface.
> 
> When we only have a small number of catalogs, we can register like
> this, but when there are thousands of catalogs, I think that there
> needs to be a dynamic loading mechanism that we can register catalog
> when needed, speed up the initialization of the table environment, and
> avoid the useless catalog registration process.
> 
> Preliminary thoughts:
> 
> A new CatalogProvider interface can be added:
> It contains two interfaces:
> * listCatalogs() interface, which can list all the interfaces that the
> interface can provide
> * getCatalog() interface,  which can get a catalog instance by catalog name.
> 
> ```java
> public interface CatalogProvider {
> 
>      default void initialize(ClassLoader classLoader, ReadableConfig config) {}
> 
>      Optional<Catalog> getCatalog(String catalogName);
> 
>      Set<String> listCatalogs();
> }
> ```
> 
> 
> The corresponding implementation in CatalogManager is as follows:
> 
> ```java
> public CatalogManager {
>      private @Nullable CatalogProvider catalogProvider;
> 
>      private Map<String, Catalog> catalogs;
> 
>      public void setCatalogProvider(CatalogProvider catalogProvider) {
>          this.catalogProvider = catalogProvider;
>      }
> 
>      public Optional<Catalog> getCatalog(String catalogName) {
>          // If there is no corresponding catalog in catalogs,
>          // get catalog by catalogProvider
>          if (catalogProvider != null) {
>              Optional<Catalog> catalog = catalogProvider.getCatalog(catalogName);
>          }
>      }
> 
> }
> ```
> 
> 
> 
> Possible problems:
> 
> 1. Catalog name conflict, how to choose when the registered catalog
> and the catalog provided by catalog-provider conflict?
> I prefer tableEnv-registered ones over catalogs provided by the
> catalog-provider. If the user wishes to reference the catalog provided
> by the catalog-provider, they can unregister the catalog in tableEnv
> through the `unregisterCatalog` interface.
> 
> 2. Number of CatalogProviders, is it possible to have multiple
> catalogProvider implementations?
> I don't have a good idea of this at the moment. If multiple
> catalogProviders are supported, it brings much more convenience, But
> there may be catalog name conflicts between different
> catalogProviders.
> 
> 
> 
> Looking forward to your reply, any feedback is appreciated!
> 
> 
> Best.
> 
> Feng Jin
>