You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Wenchen Fan <cl...@gmail.com> on 2017/10/01 15:25:36 UTC

Re: [discuss] Data Source V2 write path

The main entries of data source inside Spark is the SQL API and
`DataFrameReader/Writer`.

For SQL API, I think the semantic is well defined, the data and metadata
operations are separated. E.g., INSERT INTO means write data into an
existing table, CREATE TABLE means only create the metadata. But the
problem is, data source v1 can't provide metadata access, so we still mix
data and metadata operations at data source level. Data source v2 can solve
this problem, by introducing a `CatalogSupport` trait which can allow data
source to provide metadata access.

For DataFrameWriter API, e.g. `df.write.format(...).save()`, there is no
separation of data and metadata. According to the document, it seems Spark
doesn't care about metadata here, and assumes the underlying data sources
take care of it.

I have 3 proposals:
1. `DataFrameWriter.save` doesn't support data source v2, we create new
APIs(or reusing `DataFrameWriter.insertInto/saveAsTable`?) for data source
v2 that separate data and metadata clearly.
2. Update the semantic of `DataFrameWriter.save` and say it's only for
writing data, not metadata. Then data source v2 can separate data and
metadata clearly, because the upper API also seperates them.
3. `DataFrameWriter.save` supports data source v2, and concrete data source
implementations define their own behaviors. They can separate data and
metadata operations, and fail `DataFrameWriter.save` if the table
doesn't exist. They can mix the data and metadata operations in
the write path, and use `options` to carry metadata information.

Proposal 1 and 2 are similar, and proposal 1 is better because changing the
semantic of an existing API is not good. Both proposal 1 and 2 need to wait
for catalog federation before releasing data source v2, so that we can
clearly define what catalog functionalities the data source should provide.
Both proposal 1 and 2 force data sources to provide metadata
access(catalog), which is unfriendly to simple data sources that don't have
metastore.

Personally I prefer proposal 3, because it's not blocked by catalog
federation, so that we can develop it incrementally. And it makes the
catalog support optional, so that simple data sources without metastore can
also implement data source v2.

More proposals are welcome!


On Sat, Sep 30, 2017 at 3:26 AM, Ryan Blue <rb...@netflix.com> wrote:

> > Spark doesn't know how to create a table in external systems like
> Cassandra, and that's why it's currently done inside the data source writer.
>
> This isn't a valid argument for doing this task in the writer for v2. If
> we want to fix the problems with v1, we shouldn't continue to mix write
> operations with table metadata changes simply because it is more convenient
> and requires less refactoring.
>
> I'm proposing that in v2 we move creation of file system tables outside of
> the writer, but still in a non-public implementation. Cassandra and other
> external stores would behave as they should today and assume the table
> exists, or would wait to use the v2 API until there is catalog support.
>
> The important thing is that we don't set a standard that writers can
> create tables, which is going to lead to different behavior across
> implementations when we have conflicts between an existing table's config
> and the options passed into the writer.
>
> > For now, Spark just assumes data source writer takes care of it. For the
> internal file format data source, I propose to pass partition/bucket
> information to the writer via options, other data sources can define their
> own behavior, e.g. they can also use the options, or disallow users to
> write data to a non-existing table and ask users to create the table in the
> external systems first.
>
> The point is preventing data sources from defining their own behavior so
> we can introduce consistent behavior across sources for v2.
>
> rb
>
> On Thu, Sep 28, 2017 at 8:49 PM, Wenchen Fan <cl...@gmail.com> wrote:
>
>> > When this CTAS logical node is turned into a physical plan, the
>> relation gets turned into a `DataSourceV2` instance and then Spark gets a
>> writer and configures it with the proposed API. The main point of this is
>> to pass the logical relation (with all of the user's options) through to
>> the data source, not the writer. The data source creates the writer and can
>> tell the writer what to do.
>>
>> Here is the problem: Spark doesn't know how to create a table in external
>> systems like Cassandra, and that's why it's currently done inside the data
>> source writer.
>>
>> In the future, we can add a new trait `CatalogSupport` for
>> `DataSourceV2`, so that we can use your proposal and separate metadata
>> management from data source writer.
>>
>> For now, Spark just assumes data source writer takes care of it. For the
>> internal file format data source, I propose to pass partition/bucket
>> information to the writer via options, other data sources can define their
>> own behavior, e.g. they can also use the options, or disallow users to
>> write data to a non-existing table and ask users to create the table in the
>> external systems first.
>>
>>
>>
>> On Thu, Sep 28, 2017 at 5:45 AM, Russell Spitzer <
>> russell.spitzer@gmail.com> wrote:
>>
>>> On an unrelated note, is there any appetite for making the write path
>>> also include an option to return elements that were not
>>> able to be processed for some reason.
>>>
>>> Usage might be like
>>>
>>> saveAndIgnoreFailures() : Dataset
>>>
>>> So that if some records cannot be parsed by the datasource for writing,
>>> or violate some contract with the datasource the records can be returned
>>> for further processing or dealt with by an alternate system.
>>>
>>> On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue <rb...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Comments inline. I've written up what I'm proposing with a bit more
>>>> detail.
>>>>
>>>> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan <cl...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm trying to give a summary:
>>>>>
>>>>> Ideally data source API should only deal with data, not metadata. But
>>>>> one key problem is, Spark still need to support data sources without
>>>>> metastore, e.g. file format data sources.
>>>>>
>>>>> For this kind of data sources, users have to pass the metadata
>>>>> information like partitioning/bucketing to every write action of a
>>>>> "table"(or other identifiers like path of a file format data source), and
>>>>> it's user's responsibility to make sure these metadata information are
>>>>> consistent. If it's inconsistent, the behavior is undefined, different data
>>>>> sources may have different behaviors.
>>>>>
>>>>
>>>> Agreed so far. One minor point is that we currently throws an exception
>>>> if you try to configure, for example, partitioning and also use
>>>> `insertInto`.
>>>>
>>>>
>>>>> If we agree on this, then data source write API should have a way to
>>>>> pass these metadata information, and I think using data source options is
>>>>> a good choice because it's the most implicit way and doesn't require new
>>>>> APIs.
>>>>>
>>>>
>>>> What I don't understand is why we "can't avoid this problem" unless you
>>>> mean the last point, that we have to support this. I don't think that using
>>>> data source options is a good choice, but maybe I don't understand the
>>>> alternatives. Here's a straw-man version of what I'm proposing so you can
>>>> tell me what's wrong with it or why options are a better choice.
>>>>
>>>> I'm assuming we start with a query like this:
>>>> ```
>>>> df.write.partitionBy("utc_date").bucketBy("primary_key").for
>>>> mat("parquet").saveAsTable("s3://bucket/path/")
>>>> ```
>>>>
>>>> That creates a logical node, `CreateTableAsSelect`, with some options.
>>>> It would contain a `Relation` (or `CatalogTable` definition?) that
>>>> corresponds to the user's table name and `partitionBy`, `format`, etc.
>>>> calls. It should also have a write mode and the logical plan for `df`.
>>>>
>>>> When this CTAS logical node is turned into a physical plan, the
>>>> relation gets turned into a `DataSourceV2` instance and then Spark gets a
>>>> writer and configures it with the proposed API. The main point of this is
>>>> to pass the logical relation (with all of the user's options) through to
>>>> the data source, not the writer. The data source creates the writer and can
>>>> tell the writer what to do. Another benefit of this approach is that the
>>>> relation gets resolved during analysis, when it is easy to add sorts and
>>>> other requirements to the logical plan.
>>>>
>>>> If we were to implement what I'm suggesting, then we could handle
>>>> metadata conflicts outside of the `DataSourceV2Writer`, in the data source.
>>>> That eliminates problems about defining behavior when there are conflicts
>>>> (the next point) and prepares implementations for a catalog API that would
>>>> standardize how those conflicts are handled. In the short term, this
>>>> doesn't have to be in a public API yet. It can be special handling for
>>>> HadoopFS relations that we can eventually use underneath a public API.
>>>>
>>>> Please let me know if I've misunderstood something. Now that I've
>>>> written out how we could actually implement conflict handling outside of
>>>> the writer, I can see that it isn't as obvious of a change as I thought.
>>>> But, I think in the long term this would be a better way to go.
>>>>
>>>>
>>>>> But then we have another problem: how to define the behavior for data
>>>>> sources with metastore when the given options contain metadata information?
>>>>> A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
>>>>> partition columns, he doesn't know what will happen. The table may not
>>>>> exist and he may create the table successfully with specified partition
>>>>> columns, or the table already exist but has inconsistent partition columns
>>>>> and Spark throws exception. Besides, save mode doesn't play well in this
>>>>> case, as we may need different save modes for data and metadata.
>>>>>
>>>>> My proposal: data source API should only focus on data, but concrete
>>>>> data sources can implement some dirty features via options. e.g. file
>>>>> format data sources can take partitioning/bucketing from options, data
>>>>> source with metastore can use a special flag in options to indicate a
>>>>> create table command(without writing data).
>>>>>
>>>>
>>>> I can see how this would make changes smaller, but I don't think it is
>>>> a good thing to do. If we do this, then I think we will not really
>>>> accomplish what we want to with this (a clean write API).
>>>>
>>>>
>>>>> In other words, Spark connects users to data sources with a clean
>>>>> protocol that only focus on data, but this protocol has a backdoor: the
>>>>> data source options. Concrete data sources are free to define how to deal
>>>>> with metadata, e.g. Cassandra data source can ask users to create table at
>>>>> Cassandra side first, then write data at Spark side, or ask users to
>>>>> provide more details in options and do CTAS at Spark side. These can be
>>>>> done via options.
>>>>>
>>>>> After catalog federation, hopefully only file format data sources
>>>>> still use this backdoor.
>>>>>
>>>>
>>>> Why would file format sources use a back door after catalog federation??
>>>>
>>>> rb
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: [discuss] Data Source V2 write path

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
As far as changes to the public API go, I’d prefer deprecating the API that
mixes data and metadata operations. But I don’t think that requires that we
go with your proposal #1, where the current write API can’t use data source
v2 writers. I think we can separate the metadata operations for Hadoop FS
tables from writes using the current public API. If this isn’t possible for
Hadoop FS sources, I’d like to understand *why* it isn’t possible (*how*
would it require a catalog API?). This also requires special handling for
Hadoop FS sources, it could be that this plan would cause some existing v1
sources, to lack certain operations before the catalog API is ready. We
should identify what those implementations are and then see whether this
would be worth it.

To sum up what I’m saying, here’s my proposal:

   - *4* — DataFrameWriter.save supports data source v2 writers, which
   assume that tables exist and have some configuration. That configuration is
   passed as a Relation or CatalogTable when the writer is created. For
   Hadoop FS tables, we would move metadata operations out of the v2 writer
   and would implement them in a command or using a non-public API called by a
   command. This doesn’t require waiting for a catalog API because it would
   have the same behavior as v1, just *implemented outside the new write
   API*.

This would not require any public API changes, but would prepare for the
catalog API by separating metadata operations from the writers. If writers
are expected to perform some metadata operations, then we won’t be able to
cleanly add the catalog API without changing the expectations of existing
writers.

Eventually, we should deprecate the existing write API so expectations are
clear like they are in SQL. I’ve been thinking it should look like this
(but this is by no means well thought-through):

df.insert.into("db.table") // inserts into db.table by column name
df.insert.byPosition.into("db.table") // insert by
positiondf.save.as("db.table") // creates db.table as select, fails if
exists

I’d also like an API for table creation, which may have more options than
the df.save.as("..."). This would all be after the catalog API is
introduced, of course.

rb
​

On Sun, Oct 1, 2017 at 8:25 AM, Wenchen Fan <cl...@gmail.com> wrote:

> The main entries of data source inside Spark is the SQL API and
> `DataFrameReader/Writer`.
>
> For SQL API, I think the semantic is well defined, the data and metadata
> operations are separated. E.g., INSERT INTO means write data into an
> existing table, CREATE TABLE means only create the metadata. But the
> problem is, data source v1 can't provide metadata access, so we still mix
> data and metadata operations at data source level. Data source v2 can solve
> this problem, by introducing a `CatalogSupport` trait which can allow data
> source to provide metadata access.
>
> For DataFrameWriter API, e.g. `df.write.format(...).save()`, there is no
> separation of data and metadata. According to the document, it seems Spark
> doesn't care about metadata here, and assumes the underlying data sources
> take care of it.
>
> I have 3 proposals:
> 1. `DataFrameWriter.save` doesn't support data source v2, we create new
> APIs(or reusing `DataFrameWriter.insertInto/saveAsTable`?) for data
> source v2 that separate data and metadata clearly.
> 2. Update the semantic of `DataFrameWriter.save` and say it's only for
> writing data, not metadata. Then data source v2 can separate data and
> metadata clearly, because the upper API also seperates them.
> 3. `DataFrameWriter.save` supports data source v2, and concrete data
> source implementations define their own behaviors. They can separate data
> and metadata operations, and fail `DataFrameWriter.save` if the table
> doesn't exist. They can mix the data and metadata operations in
> the write path, and use `options` to carry metadata information.
>
> Proposal 1 and 2 are similar, and proposal 1 is better because changing
> the semantic of an existing API is not good. Both proposal 1 and 2 need to
> wait for catalog federation before releasing data source v2, so that we can
> clearly define what catalog functionalities the data source should provide.
> Both proposal 1 and 2 force data sources to provide metadata
> access(catalog), which is unfriendly to simple data sources that don't have
> metastore.
>
> Personally I prefer proposal 3, because it's not blocked by catalog
> federation, so that we can develop it incrementally. And it makes the
> catalog support optional, so that simple data sources without metastore can
> also implement data source v2.
>
> More proposals are welcome!
>
>
> On Sat, Sep 30, 2017 at 3:26 AM, Ryan Blue <rb...@netflix.com> wrote:
>
>> > Spark doesn't know how to create a table in external systems like
>> Cassandra, and that's why it's currently done inside the data source writer.
>>
>> This isn't a valid argument for doing this task in the writer for v2. If
>> we want to fix the problems with v1, we shouldn't continue to mix write
>> operations with table metadata changes simply because it is more convenient
>> and requires less refactoring.
>>
>> I'm proposing that in v2 we move creation of file system tables outside
>> of the writer, but still in a non-public implementation. Cassandra and
>> other external stores would behave as they should today and assume the
>> table exists, or would wait to use the v2 API until there is catalog
>> support.
>>
>> The important thing is that we don't set a standard that writers can
>> create tables, which is going to lead to different behavior across
>> implementations when we have conflicts between an existing table's config
>> and the options passed into the writer.
>>
>> > For now, Spark just assumes data source writer takes care of it. For
>> the internal file format data source, I propose to pass partition/bucket
>> information to the writer via options, other data sources can define their
>> own behavior, e.g. they can also use the options, or disallow users to
>> write data to a non-existing table and ask users to create the table in the
>> external systems first.
>>
>> The point is preventing data sources from defining their own behavior so
>> we can introduce consistent behavior across sources for v2.
>>
>> rb
>>
>> On Thu, Sep 28, 2017 at 8:49 PM, Wenchen Fan <cl...@gmail.com> wrote:
>>
>>> > When this CTAS logical node is turned into a physical plan, the
>>> relation gets turned into a `DataSourceV2` instance and then Spark gets a
>>> writer and configures it with the proposed API. The main point of this is
>>> to pass the logical relation (with all of the user's options) through to
>>> the data source, not the writer. The data source creates the writer and can
>>> tell the writer what to do.
>>>
>>> Here is the problem: Spark doesn't know how to create a table in
>>> external systems like Cassandra, and that's why it's currently done inside
>>> the data source writer.
>>>
>>> In the future, we can add a new trait `CatalogSupport` for
>>> `DataSourceV2`, so that we can use your proposal and separate metadata
>>> management from data source writer.
>>>
>>> For now, Spark just assumes data source writer takes care of it. For the
>>> internal file format data source, I propose to pass partition/bucket
>>> information to the writer via options, other data sources can define their
>>> own behavior, e.g. they can also use the options, or disallow users to
>>> write data to a non-existing table and ask users to create the table in the
>>> external systems first.
>>>
>>>
>>>
>>> On Thu, Sep 28, 2017 at 5:45 AM, Russell Spitzer <
>>> russell.spitzer@gmail.com> wrote:
>>>
>>>> On an unrelated note, is there any appetite for making the write path
>>>> also include an option to return elements that were not
>>>> able to be processed for some reason.
>>>>
>>>> Usage might be like
>>>>
>>>> saveAndIgnoreFailures() : Dataset
>>>>
>>>> So that if some records cannot be parsed by the datasource for writing,
>>>> or violate some contract with the datasource the records can be returned
>>>> for further processing or dealt with by an alternate system.
>>>>
>>>> On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Comments inline. I've written up what I'm proposing with a bit more
>>>>> detail.
>>>>>
>>>>> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan <cl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm trying to give a summary:
>>>>>>
>>>>>> Ideally data source API should only deal with data, not metadata. But
>>>>>> one key problem is, Spark still need to support data sources without
>>>>>> metastore, e.g. file format data sources.
>>>>>>
>>>>>> For this kind of data sources, users have to pass the metadata
>>>>>> information like partitioning/bucketing to every write action of a
>>>>>> "table"(or other identifiers like path of a file format data source), and
>>>>>> it's user's responsibility to make sure these metadata information are
>>>>>> consistent. If it's inconsistent, the behavior is undefined, different data
>>>>>> sources may have different behaviors.
>>>>>>
>>>>>
>>>>> Agreed so far. One minor point is that we currently throws an
>>>>> exception if you try to configure, for example, partitioning and also use
>>>>> `insertInto`.
>>>>>
>>>>>
>>>>>> If we agree on this, then data source write API should have a way to
>>>>>> pass these metadata information, and I think using data source options is
>>>>>> a good choice because it's the most implicit way and doesn't require new
>>>>>> APIs.
>>>>>>
>>>>>
>>>>> What I don't understand is why we "can't avoid this problem" unless
>>>>> you mean the last point, that we have to support this. I don't think that
>>>>> using data source options is a good choice, but maybe I don't understand
>>>>> the alternatives. Here's a straw-man version of what I'm proposing so you
>>>>> can tell me what's wrong with it or why options are a better choice.
>>>>>
>>>>> I'm assuming we start with a query like this:
>>>>> ```
>>>>> df.write.partitionBy("utc_date").bucketBy("primary_key").for
>>>>> mat("parquet").saveAsTable("s3://bucket/path/")
>>>>> ```
>>>>>
>>>>> That creates a logical node, `CreateTableAsSelect`, with some options.
>>>>> It would contain a `Relation` (or `CatalogTable` definition?) that
>>>>> corresponds to the user's table name and `partitionBy`, `format`, etc.
>>>>> calls. It should also have a write mode and the logical plan for `df`.
>>>>>
>>>>> When this CTAS logical node is turned into a physical plan, the
>>>>> relation gets turned into a `DataSourceV2` instance and then Spark gets a
>>>>> writer and configures it with the proposed API. The main point of this is
>>>>> to pass the logical relation (with all of the user's options) through to
>>>>> the data source, not the writer. The data source creates the writer and can
>>>>> tell the writer what to do. Another benefit of this approach is that the
>>>>> relation gets resolved during analysis, when it is easy to add sorts and
>>>>> other requirements to the logical plan.
>>>>>
>>>>> If we were to implement what I'm suggesting, then we could handle
>>>>> metadata conflicts outside of the `DataSourceV2Writer`, in the data source.
>>>>> That eliminates problems about defining behavior when there are conflicts
>>>>> (the next point) and prepares implementations for a catalog API that would
>>>>> standardize how those conflicts are handled. In the short term, this
>>>>> doesn't have to be in a public API yet. It can be special handling for
>>>>> HadoopFS relations that we can eventually use underneath a public API.
>>>>>
>>>>> Please let me know if I've misunderstood something. Now that I've
>>>>> written out how we could actually implement conflict handling outside of
>>>>> the writer, I can see that it isn't as obvious of a change as I thought.
>>>>> But, I think in the long term this would be a better way to go.
>>>>>
>>>>>
>>>>>> But then we have another problem: how to define the behavior for data
>>>>>> sources with metastore when the given options contain metadata information?
>>>>>> A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
>>>>>> partition columns, he doesn't know what will happen. The table may not
>>>>>> exist and he may create the table successfully with specified partition
>>>>>> columns, or the table already exist but has inconsistent partition columns
>>>>>> and Spark throws exception. Besides, save mode doesn't play well in this
>>>>>> case, as we may need different save modes for data and metadata.
>>>>>>
>>>>>> My proposal: data source API should only focus on data, but concrete
>>>>>> data sources can implement some dirty features via options. e.g. file
>>>>>> format data sources can take partitioning/bucketing from options, data
>>>>>> source with metastore can use a special flag in options to indicate a
>>>>>> create table command(without writing data).
>>>>>>
>>>>>
>>>>> I can see how this would make changes smaller, but I don't think it is
>>>>> a good thing to do. If we do this, then I think we will not really
>>>>> accomplish what we want to with this (a clean write API).
>>>>>
>>>>>
>>>>>> In other words, Spark connects users to data sources with a clean
>>>>>> protocol that only focus on data, but this protocol has a backdoor: the
>>>>>> data source options. Concrete data sources are free to define how to deal
>>>>>> with metadata, e.g. Cassandra data source can ask users to create table at
>>>>>> Cassandra side first, then write data at Spark side, or ask users to
>>>>>> provide more details in options and do CTAS at Spark side. These can be
>>>>>> done via options.
>>>>>>
>>>>>> After catalog federation, hopefully only file format data sources
>>>>>> still use this backdoor.
>>>>>>
>>>>>
>>>>> Why would file format sources use a back door after catalog
>>>>> federation??
>>>>>
>>>>> rb
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix