You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Balázs Varga <bv...@cloudera.com> on 2022/01/26 11:18:50 UTC

Resolving a CatalogTable

Hi everyone,

I'm trying to migrate from the old set of CatalogTable related APIs
(CatalogTableImpl, TableSchema, DescriptorProperties) to the new ones
(CatalogBaseTable, Schema and ResolvedSchema, CatalogPropertiesUtil), in a
custom catalog.

The catalog stores table definitions, and the current logic involves
persisting the
schema from a CatalogBaseTable to a database. When we get a table, its
definition is read from the database and the CatalogTable is built up and
returned.

For this, we currently serialize the schema like this:
descriptorProperties.putTableSchema(Schema.SCHEMA,
catalogBaseTable.getSchema());

The new API seems to intentionally only allow the serialization of the
Resolved version of objects (e.g. ResolvedCatalogTable, ResolvedSchema).

1. Could you please clarify why this limitation was put into place? It
seems to me that it would
be sufficient to resolve the CatalogTables once we are actually trying to
pass the table to the DynamicTableFactory.

2. What additional information is gained during the resolution of a
CatalogTable, and where does that information come from? Are there some
references to things in other catalogs?

3. Is it possible to "manually" resolve a CatalogTable? (invoke something
like what the internal DefaultSchemaResolver does). What context is
required?

Thanks,
Balazs

Re: Resolving a CatalogTable

Posted by Balázs Varga <bv...@cloudera.com>.
Hi Timo,

Thanks for the reply. I've thought a bit more about the problem,
considering your points.
This is not critical as of now, but for the sake of discussion, I think it
could be interesting.

The problem stems from the fact that we don't create the table via DDL, but
the following custom method:
- The user uploads an Avro schema definition (avsc file) accompanied by
some metadata. The metadata contains info about event time and such.
- Types are extracted from the avro Schema using the AvroSchemaConverter
class (currently into TypeInformation).
- There are no computed columns, UDFs, etc. The created tables are always
backed by Kafka.
- The TypeInformation and metadata are used to create a TableSchema that is
to be persisted.

I realize that Flink's new model works well if we were creating the table
through DDL, because then it would go through the CatalogManager, where the
table is resolved.
It seems a viable solution to do exactly that. Instead of directly creating
a CatalogTable from the Avro schema; create a TableDescriptor, obtain a
TableEnvironment, and call createTable along with an object path pointing
back to my Catalog. What do you think about this approach?

Also, if my understanding is correct, this means that the reason Flink
doesn't expose the resolution functionality is because in the general case,
when there might be external references, we need the all catalogs to
resolve these. So actually Flink covers the intended use cases properly.

Thanks,
Balazs

On Fri, Jan 28, 2022 at 3:37 PM Timo Walther <tw...@apache.org> wrote:

> Hi Balazs,
>
> you are right, the new APIs only allow the serialization of resolved
> instances. This ensures that only validated, correct instances are put
> into the persistent storage such as a database. The framework will
> always provide resolved instances and call the corresponding methods
> with those. They should be easily serializable.
>
> However, when reading from a persistent storage such as a database, the
> framework needs to validate the input and resolved expressions and data
> types (e.g. from a string representation).
>
> The new design reflects the reality better. A catalog implementation
> does not need to be symmetric. It follows the principle:
>
> - "Resolved" into the catalog (with all information if implementers need
> it)
> - "Unresolved" out of the catalog (let the framework deal with the
> resolution, also with cross references to other catalogs)
>
>
> Use ResolvedCatalogTable#toProperties for putting basic info into your
> database.
>
> Use CatalogTable#fromProperties to restore the table.
>
> This is esp important for expression resolution of computed columns and
> watermark strategies. Functions could come from other catalogs as well.
>
> So for implementers it is usally not important to resolved the
> `CatalogTable` manually.
>
> If it is important for you, maybe you can elaborate a bit on your use case?
>
> Regards,
> Timo
>
>
> On 26.01.22 12:18, Balázs Varga wrote:
> > Hi everyone,
> >
> > I'm trying to migrate from the old set of CatalogTable related APIs
> > (CatalogTableImpl, TableSchema, DescriptorProperties) to the new ones
> > (CatalogBaseTable, Schema and ResolvedSchema, CatalogPropertiesUtil), in
> > a custom catalog.
> >
> > The catalog stores table definitions, and the current logic involves
> > persisting the
> > schema from a CatalogBaseTable to a database. When we get a table, its
> > definition is read from the database and the CatalogTable is built up
> > and returned.
> >
> > For this, we currently serialize the schema like this:
> > descriptorProperties.putTableSchema(Schema.SCHEMA,
> > catalogBaseTable.getSchema());
> >
> > The new API seems to intentionally only allow the serialization of the
> > Resolved version of objects (e.g. ResolvedCatalogTable, ResolvedSchema).
> >
> > 1. Could you please clarify why this limitation was put into place? It
> > seems to me that it would
> > be sufficient to resolve the CatalogTables once we are actually trying
> > to pass the table to the DynamicTableFactory.
> >
> > 2. What additional information is gained during the resolution of a
> > CatalogTable, and where does that information come from? Are there some
> > references to things in other catalogs?
> >
> > 3. Is it possible to "manually" resolve a CatalogTable? (invoke
> > something like what the internal DefaultSchemaResolver does). What
> > context is required?
> >
> > Thanks,
> > Balazs
> >
>
>

Re: Resolving a CatalogTable

Posted by Timo Walther <tw...@apache.org>.
Hi Balazs,

you are right, the new APIs only allow the serialization of resolved 
instances. This ensures that only validated, correct instances are put 
into the persistent storage such as a database. The framework will 
always provide resolved instances and call the corresponding methods 
with those. They should be easily serializable.

However, when reading from a persistent storage such as a database, the 
framework needs to validate the input and resolved expressions and data 
types (e.g. from a string representation).

The new design reflects the reality better. A catalog implementation 
does not need to be symmetric. It follows the principle:

- "Resolved" into the catalog (with all information if implementers need it)
- "Unresolved" out of the catalog (let the framework deal with the 
resolution, also with cross references to other catalogs)


Use ResolvedCatalogTable#toProperties for putting basic info into your 
database.

Use CatalogTable#fromProperties to restore the table.

This is esp important for expression resolution of computed columns and 
watermark strategies. Functions could come from other catalogs as well.

So for implementers it is usally not important to resolved the 
`CatalogTable` manually.

If it is important for you, maybe you can elaborate a bit on your use case?

Regards,
Timo


On 26.01.22 12:18, Balázs Varga wrote:
> Hi everyone,
> 
> I'm trying to migrate from the old set of CatalogTable related APIs 
> (CatalogTableImpl, TableSchema, DescriptorProperties) to the new ones 
> (CatalogBaseTable, Schema and ResolvedSchema, CatalogPropertiesUtil), in 
> a custom catalog.
> 
> The catalog stores table definitions, and the current logic involves 
> persisting the
> schema from a CatalogBaseTable to a database. When we get a table, its 
> definition is read from the database and the CatalogTable is built up 
> and returned.
> 
> For this, we currently serialize the schema like this:
> descriptorProperties.putTableSchema(Schema.SCHEMA, 
> catalogBaseTable.getSchema());
> 
> The new API seems to intentionally only allow the serialization of the 
> Resolved version of objects (e.g. ResolvedCatalogTable, ResolvedSchema).
> 
> 1. Could you please clarify why this limitation was put into place? It 
> seems to me that it would
> be sufficient to resolve the CatalogTables once we are actually trying 
> to pass the table to the DynamicTableFactory.
> 
> 2. What additional information is gained during the resolution of a 
> CatalogTable, and where does that information come from? Are there some 
> references to things in other catalogs?
> 
> 3. Is it possible to "manually" resolve a CatalogTable? (invoke 
> something like what the internal DefaultSchemaResolver does). What 
> context is required?
> 
> Thanks,
> Balazs
>