You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Wenchen Fan <cl...@gmail.com> on 2018/08/01 14:07:36 UTC

Re: [DISCUSS] Multiple catalog support

For the first question. This is what we already supported. A data source
can implement `ReadSupportProvider` (based on my API improvement
<https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing>)
so that it can create `ReadSupport` by reflection. I agree with you that
most of the data sources would implement `TableCatalog` instead in the
future.

For the second question. Overall looks good. One issue is if we should
generalize the hive STORE AS syntax as well.

For the third question, I agree we should figure out the expected behavior
first.

On Wed, Aug 1, 2018 at 4:47 AM Ryan Blue <rb...@netflix.com> wrote:

> Wenchen, I think the misunderstanding is around how the v2 API should work
> with multiple catalogs.
>
> Data sources are read/write implementations that resolve to a single JVM
> class. When we consider how these implementations should work with multiple
> table catalogs, I think it is clear that the catalog needs to be able to
> choose the implementation and should be able to share implementations
> across catalogs. Those requirements are incompatible with the idea that
> Spark should get a catalog from the data source.
>
> An easy way to think about this is the Parquet example from my earlier
> email. *Why would using format("parquet") determine the catalog where a
> table is created?*
>
> The conclusion I came to is that to support CTAS and other operations that
> require a catalog, Spark should determine that catalog first, not the
> storage implementation (data source) first. The catalog should return a
> Table that implements ReadSupport and WriteSupport. The actual
> implementation class doesn’t need to be chosen by users.
>
> That leaves a few open questions.
>
> First open question: *How can we support reading tables without metadata?*
> This is your load example: df.read.format("xyz").option(...).load.
>
> I think we should continue to use the DataSource v1 loader to load a
> DataSourceV2, then define a way for that to return a Table with ReadSupport
> and WriteSupport, like this:
>
> interface DataSourceV2 {
>   public Table anonymousTable(Map<String, String> tableOptions);
> }
>
> While I agree that these tables without metadata should be supported, many
> of the current uses are actually working around missing multi-catalog
> support. JDBC is a good example. You have to point directly to a JDBC table
> using the source and options because we don’t have a way to connect to JDBC
> as a catalog. If we make catalog definition easy, then we can support CTAS
> to JDBC, make it simpler to load several tables in the same remote
> database, etc. This would also improve working with persistent JDBC tables
> because it would connect to the source of truth for table metadata instead
> of copying it into the ExternalCatalog from the Spark session.
>
> In other words, the case we should be primarily targeting is catalog-based
> tables, not tables without metadata.
>
> Second open question: *How should the format method and USING clause
> work?*
>
> I think these should be passed to the catalog and the catalog can decide
> what to do. Formats like “parquet” and “json” are currently replaced with a
> concrete Java class, so there’s precedent for these as information for the
> catalog and not concrete implementations. These should be optional and
> should get passed to any catalog.
>
> The implementation of TableCatalog backed by the current ExternalCatalog
> can continue to use format / USING to choose the data source directly,
> but there’s no requirement for other catalogs to do that because there are
> no other catalogs right now. Passing this to an Iceberg catalog could
> determine whether Iceberg’s underlying storage is “avro” or “parquet”, even
> though Iceberg uses a different data source implementation.
>
> Third open question: *How should path-based tables work?*
>
> First, path-based tables need clearly defined behavior. That’s missing
> today. I’ve heard people cite the “feature” that you can write a different
> schema to a path-based JSON table without needing to run an “alter table”
> on it to update the schema. If this is behavior we want to preserve (and I
> think it is) then we need to clearly state what that behavior is.
>
> Second, I think that we can build a TableCatalog-like interface to handle
> path tables.
>
> rb
> ​
> On Tue, Jul 31, 2018 at 7:58 AM Wenchen Fan <cl...@gmail.com> wrote:
>
>> Here is my interpretation of your proposal, please correct me if
>> something is wrong.
>>
>> End users can read/write a data source with its name and some options.
>> e.g. `df.read.format("xyz").option(...).load`. This is currently the only
>> end-user API for data source v2, and is widely used by Spark users to
>> read/write data source v1 and file sources, we should still support it. We
>> will add more end-user APIs in the future, once we standardize the DDL
>> logical plans.
>>
>> If a data source wants to be used with tables, then it must implement
>> some catalog functionalities. At least it needs to support
>> create/lookup/alter/drop table, and optionally more features like managing
>> functions/views and supporting the USING syntax. This means, to use file
>> source with tables, we need another data source that has full catalog
>> functionalities. We can implement a Hive data source with all catalog
>> functionalities backed by HMS, or a Glue data source backed by AWS Glue.
>> They should both support USING syntax and thus support file sources. If
>> USING is not specified, the default storage(hive tables) should be used.
>>
>> For path-based tables, we can create a special API for it and define the
>> rule to resolve ambiguity when looking up tables.
>>
>> If we go with this direction, one problem is that, data source may not be
>> a good name anymore, since a data source can provide catalog
>> functionalities.
>>
>> Under the hood, I feel this proposal is very similar to my second
>> proposal, except that a catalog implementation must provide a default data
>> source/storage, and different rule for looking up tables.
>>
>>
>> On Sun, Jul 29, 2018 at 11:43 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Wenchen, what I'm suggesting is a bit of both of your proposals.
>>>
>>> I think that USING should be optional like your first option. USING (or
>>> format(...) in the DF side) should configure the source or implementation,
>>> while the catalog should be part of the table identifier. They serve two
>>> different purposes: configuring the storage within the catalog, and
>>> choosing which catalog to pass create or other calls to. I think that's
>>> pretty much what you suggest in #1. The USING syntax would continue to be
>>> used to configure storage within a catalog.
>>>
>>> (Side note: I don't think this needs to be tied to a particular
>>> implementation. We currently use 'parquet' to tell the Spark catalog to use
>>> the Parquet source, but another catalog could also use 'parquet' to store
>>> data in Parquet format without using the Spark built-in source.)
>>>
>>> The second option suggests separating the catalog API from data source.
>>> In #21306 <https://github.com/apache/spark/pull/21306>, I add the
>>> proposed catalog API and a reflection-based loader like the v1 sources use
>>> (and v2 sources have used so far). I think that it makes much more
>>> sense to start with a catalog and then get the data source for operations
>>> like CTAS. This is compatible with the behavior from your point #1: the
>>> catalog chooses the source implementation and USING is optional.
>>>
>>> The reason why we considered an API to get a catalog from the source is
>>> because we defined the source API first, but it doesn't make sense to get a
>>> catalog from the data source. Catalogs can share data sources (e.g. prod
>>> and test environments). Plus, it makes more sense to determine the catalog
>>> and then have it return the source implementation because it may require a
>>> specific one, like JDBC or Iceberg would. With standard logical plans we
>>> always know the catalog when creating the plan: either the table identifier
>>> includes an explicit one, or the default catalog is used.
>>>
>>> In the PR I mentioned above, the catalog implementation's class is
>>> determined by Spark config properties, so there's no need to use
>>> ServiceLoader and we can use the same implementation class for multiple
>>> catalogs with different configs (e.g. prod and test environments).
>>>
>>> Your last point about path-based tables deserves some attention. But, we
>>> also need to define the behavior of path-based tables. Part of what we want
>>> to preserve is flexibility, like how you don't need to alter the schema in
>>> JSON tables, you just write different data. For the path-based syntax, I
>>> suggest looking up source first and using the source if there is one. If
>>> not, then look up the catalog. That way existing tables work, but we can
>>> migrate to catalogs with names that don't conflict.
>>>
>>> rb
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>