You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Wenchen Fan <cl...@gmail.com> on 2018/04/16 02:45:01 UTC

[discuss][data source v2] remove type parameter in DataReader/WriterFactory

Hi all,

I'd like to propose an API change to the data source v2.

One design goal of data source v2 is API type safety. The FileFormat API is
a bad example, it asks the implementation to return InternalRow even it's
actually ColumnarBatch. In data source v2 we add a type parameter to
DataReader/WriterFactoty and DataReader/Writer, so that data source
supporting columnar scan returns ColumnarBatch at API level.

However, we met some problems when migrating streaming and file-based data
source to data source v2.

For the streaming side, we need a variant of DataReader/WriterFactory to
add streaming specific concept like epoch id and offset. For details please
see ContinuousDataReaderFactory and
https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#

But this conflicts with the special format mixin traits like
SupportsScanColumnarBatch. We have to make the streaming variant of
DataReader/WriterFactory to extend the original DataReader/WriterFactory,
and do type cast at runtime, which is unnecessary and violate the type
safety.

For the file-based data source side, we have a problem with code
duplication. Let's take ORC data source as an example. To support both
unsafe row and columnar batch scan, we need something like

// A lot of parameters to carry to the executor side
class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
  def createDataReader ...
}

class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch]
{
  def createDataReader ...
}

class OrcDataSourceReader extends DataSourceReader {
  def createUnsafeRowFactories = ... // logic to prepare the parameters and
create factories

  def createColumnarBatchFactories = ... // logic to prepare the parameters
and create factories
}

You can see that we have duplicated logic for preparing parameters and
defining the factory.

Here I propose to remove all the special format mixin traits and change the
factory interface to

public enum DataFormat {
  ROW,
  INTERNAL_ROW,
  UNSAFE_ROW,
  COLUMNAR_BATCH
}

interface DataReaderFactory {
  DataFormat dataFormat;

  default DataReader<Row> createRowDataReader() {
    throw new IllegalStateException();
  }

  default DataReader<UnsafeRow> createUnsafeRowDataReader() {
    throw new IllegalStateException();
  }

  default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
    throw new IllegalStateException();
  }
}

Spark will look at the dataFormat and decide which create data reader
method to call.

Now we don't have the problem for the streaming side as these special
format mixin traits go away. And the ORC data source can also be simplified
to

class OrcReaderFactory(...) extends DataReaderFactory {
  def createUnsafeRowReader ...

  def createColumnarBatchReader ...
}

class OrcDataSourceReader extends DataSourceReader {
  def createReadFactories = ... // logic to prepare the parameters and
create factories
}

We also have a potential benefit of supporting hybrid storage data source,
which may keep real-time data in row format, and history data in columnar
format. Then they can make some DataReaderFactory output InternalRow and
some output ColumnarBatch.

Thoughts?

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Wenchen Fan <cl...@gmail.com>.
I agree that we should hold the API changes until we finalize the streaming
side design.

To add some more color to this discussion, I think the fundamental design
problem is, `SupportsScanColumnarBatch` and friends are not real mixin
traits. They do not mixin new stuff, but replace the existing
stuff(`createDataReaderFactories`).

This brings several problems:
1. The interface definition is hacky. We need to first override the
existing `createDataReaderFactories` to throw an exception, and then add a
new create factory method for different data format.
2. When a data source implements multiple special scan mixin traits, we
have to document which trait is effective. Data source can't
output different formats at the same time, but we don't prevent it at the
API level.
3. If the stuff to replace is different, we need different traits. This is
basically the problem of streaming variant of DataReader/WriterFactory I
mentioned before.

Anyway let's wait and see what's the actual API requirement from the
streaming side and decide later.

On Sat, Apr 21, 2018 at 4:53 AM, Joseph Torres <joseph.torres@databricks.com
> wrote:

> ContinuousReader and friends haven't yet gotten a design doc because
> they're significantly constrained by the (incomplete) implementation
> of SPARK-20928. Since continuous processing queries never pause and pass
> control back to the driver, implementation details of the continuous
> processing engine will determine what methods and lifecycles are possible
> to support at the data source API level.
>
> Within a few weeks, I'm confident we can bring SPARK-20928 to the point
> where a useful design doc can be proposed. Until we have such a doc, I
> agree that we should not consider changes to the general V2 surface based
> on the preliminary API which currently exists.
>
> On Thu, Apr 19, 2018 at 1:47 PM, Ryan Blue <rb...@netflix.com> wrote:
>
>> Wenchen, thanks for clarifying.
>>
>> I think it is valuable to consider the API as a whole because it’s
>> difficult to think about the impact of these changes otherwise. With that
>> in mind, here’s a snapshot of the relevant portion of the batch API, which
>> I think is pretty reasonable:
>>
>> v2.ReadSupport // source supports reading
>>   DataSourceReader createReader(options)
>>
>> v2.DataSourceReader // created to configure and perform a read
>>   List<DataReaderFactory<InternalRow>> createDataReaderFactories()
>>
>> v2.SupportsScanColumnarBatch // Reader mix-in to create vector readers
>>   List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories()
>>
>> v2.DataReaderFactory<D> // each one is a unique read task; equivalent to Iterable<D>
>>   DataReader<D> createDataReader()
>>
>> v2.DataReader<D> // equivalent to an Iterator<D>
>>
>> And here’s the current streaming side, I think (not including Microbatch
>> classes):
>>
>> v2.ContinuousReadSupport // source supports continuous reading
>>   ContinuousReader createContinuousReader(schema, checkpointLocation, options)
>>
>> v2.ContinuousReader extends DataSourceReader // configure/perform a continuous read
>>   // inherits createDataReaderFactories() from DataSourceReader
>>   // may have createBatchDataReaderFactories() from SupportsScanColumnarBatch
>>
>> v2.ContinuousDataReaderFactory<D> // a continuous read task
>>   DataReader<D> createDataReaderWithOffset(PartitionOffset)
>>
>> Looks like the reason why casting is required is that
>> ContinuousReader#createDataReaderFactories is inherited and doesn’t
>> return ContinuousDataReaderFactory even though it actually needs to. In
>> that case, why reuse DataSourceReader when ContinuousReader could expose
>> create for a list of continuous factories/tasks? Then we have just one
>> mix-in trait for batch and one for streaming. This looks like a consequence
>> of partially reusing classes. I don’t think there is enough reason to
>> refactor the API here.
>>
>> Not refactoring has a few benefits:
>>
>>    - Keeping the mix-in structure maintains consistency with the rest of
>>    the API, which uses mix-ins for optional traits.
>>    - It also keeps the API small for simple or basic implementations:
>>    mix-ins bring in more options, but they are entirely optional. Adding 4
>>    methods to each factory/task is more complicated.
>>    - This maintains the intent of the task and data-reader classes,
>>    which is to provide an API like Iterable/Iterator.
>>
>> I think the second problem can be solved by inheritance where necessary,
>> but I don’t know how big of a problem this is. How many implementations are
>> going to provide both row and vector reads? Why would an implementation
>> provide both? If streaming and batch need to be separate, then the
>> constructors will probably be different as well. I don’t think changing the
>> API is going to be useful for this.
>>
>> In addition, *I think this discussion is very likely a consequence of
>> not proposing and discussing the v2 streaming API publicly*. There’s no
>> published design that gives a high-level overview of the streaming API, and
>> I’m really concerned because problems with it are resulting in proposed
>> refactors to the batch API that *was* discussed and is already available.
>>
>> The write-side design doc that Joseph put together is a good start,
>> especially the diagram because it gives a great visual to help reason about
>> it. Could you please put together a doc for the read side as well?
>>
>> rb
>>
>>
>>
>> On Wed, Apr 18, 2018 at 10:20 PM, Wenchen Fan <cl...@gmail.com>
>> wrote:
>>
>>  First of all, I think we all agree that data source v2 API should at
>>> least support InternalRow and ColumnarBatch. With this assumption, the
>>> current API has 2 problems:
>>>
>>> *First problem*: We use mixin traits to add support for different data
>>> formats.
>>>
>>> The mixin traits define API to return DataReader/WriterFactory for
>>> different formats. It brings a lot of trouble to streaming, as
>>> streaming has its own factory interface, which we don't want it to extend
>>> the batch factory. This means we need to duplicate the mixin traits for
>>> batch and streaming. Keep in mind that duplicating the traits is also a
>>> possible solution, if there is no better way.
>>>
>>> Another possible solution is, remove the mixin traits and put all
>>> "createFactory" method in DataSourceReader/Writer, with a new method to
>>> indicate which "createFactory" method Spark should call. Then the API looks
>>> like
>>>
>>> interface DataSourceReader {
>>>   DataFormat dataFormat;
>>>
>>>   default List<DataReaderFactory<Row>> createDataReaderFactories() {
>>>     throw new IllegalStateException();
>>>   }
>>>
>>>   default List<DataReaderFactory<ColumnarBatch>>
>>> createColumnarBatchDataReaderFactories() {
>>>     throw new IllegalStateException();
>>>   }
>>> }
>>>
>>> or to be more friendly to people who don't care about columnar format
>>>
>>> interface DataSourceReader {
>>>   default DataFormat dataFormat { return DataFormat.INTERNAL_ROW };
>>>
>>>   List<DataReaderFactory<Row>> createDataReaderFactories();
>>>
>>>   default List<DataReaderFactory<ColumnarBatch>> createColumnarBatchDataReaderFactories()
>>> {
>>>     throw new IllegalStateException();
>>>   }
>>> }
>>>
>>> This solution still brings some trouble to streaming, as the streaming
>>> specific DataSourceReader needs to re-define all these "createFactory"
>>> methods, but it's much better than duplicating the mixin traits.
>>>
>>> *Second problem*: The DataReader/WriterFactory may have a lot of
>>> constructor parameters, it's painful to define different factories with the
>>> same but very long parameter list.
>>> After a closer look, I think this is the major part of the duplicated
>>> code. This is not a strong reason, so it's OK if people don't think it's a
>>> problem. In the meanwhile, I think it might be better to shift the data
>>> format stuff to the factory so that we can support hybrid storage data
>>> source in the future, like I mentioned before.
>>>
>>>
>>> Finally, we can also consider Joseph's proposal, to remove the type
>>> parameter entirely and get rid of this problem.
>>>
>>>
>>>
>>> On Thu, Apr 19, 2018 at 8:54 AM, Joseph Torres <
>>> joseph.torres@databricks.com> wrote:
>>>
>>>> The fundamental difficulty seems to be that there's a spurious
>>>> "round-trip" in the API. Spark inspects the source to determine what type
>>>> it's going to provide, picks an appropriate method according to that type,
>>>> and then calls that method on the source to finally get what it wants.
>>>> Pushing this out of the DataSourceReader doesn't eliminate this problem; it
>>>> just shifts it. We still need an InternalRow method and a ColumnarBatch
>>>> method and possibly Row and UnsafeRow methods too.
>>>>
>>>> I'd propose it would be better to just accept a bit less type safety
>>>> here, and push the problem all the way down to the DataReader. Make
>>>> DataReader.get() return Object, and document that the runtime type had
>>>> better match the type declared in the reader's DataFormat. Then we can get
>>>> rid of the special Row/UnsafeRow/ColumnarBatch methods cluttering up the
>>>> API, and figure out whether to support Row and UnsafeRow independently of
>>>> all our other API decisions. (I didn't think about this until now, but the
>>>> fact that some orthogonal API decisions have to be conditioned on which set
>>>> of row formats we support seems like a code smell.)
>>>>
>>>> On Wed, Apr 18, 2018 at 3:53 PM, Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Wenchen, can you explain a bit more clearly why this is necessary? The
>>>>> pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
>>>>> handled this with inheritance from an abstract Factory class? Why define
>>>>> all of the createXDataReader methods, but make the DataFormat a field
>>>>> in the factory?
>>>>>
>>>>> A related issue is that I think there’s a strong case that the v2
>>>>> sources should produce only InternalRow and that Row and UnsafeRow
>>>>> shouldn’t be exposed; see SPARK-23325
>>>>> <https://issues.apache.org/jira/browse/SPARK-23325>. The basic
>>>>> arguments are:
>>>>>
>>>>>    - UnsafeRow is really difficult to produce without using Spark’s
>>>>>    projection methods. If implementations can produce UnsafeRow, then
>>>>>    they can still pass them as InternalRow and the projection Spark
>>>>>    adds would be a no-op. When implementations can’t produce UnsafeRow,
>>>>>    then it is better for Spark to insert the projection to unsafe. An example
>>>>>    of a data format that doesn’t produce unsafe is the built-in Parquet
>>>>>    source, which produces InternalRow and projects before returning
>>>>>    the row.
>>>>>    - For Row, I see no good reason to support it in a new interface
>>>>>    when it will just introduce an extra transformation. The argument that
>>>>>    Row is the “public” API doesn’t apply because UnsafeRow is already
>>>>>    exposed through the v2 API.
>>>>>    - Standardizing on InternalRow would remove the need for these
>>>>>    interfaces entirely and simplify what implementers must provide and would
>>>>>    reduce confusion over what to do.
>>>>>
>>>>> Using InternalRow doesn’t cover the case where we want to produce
>>>>> ColumnarBatch instead, so what you’re proposing might still be a good
>>>>> idea. I just think that we can simplify either path.
>>>>> ​
>>>>>
>>>>> On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan <cl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yea definitely not. The only requirement is, the
>>>>>> DataReader/WriterFactory must support at least one DataFormat.
>>>>>>
>>>>>> >  how are we going to express capability of the given reader of its
>>>>>> supported format(s), or specific support for each of “real-time data in row
>>>>>> format, and history data in columnar format”?
>>>>>>
>>>>>> When DataSourceReader/Writer create factories, the factory must
>>>>>> contain enough information to decide the data format. Let's take ORC as an
>>>>>> example. In OrcReaderFactory, it knows which files to read, and
>>>>>> which columns to output. Since now Spark only support columnar scan for
>>>>>> simple types, OrcReaderFactory will only output ColumnarBatch if the
>>>>>> columns to scan are all simple types.
>>>>>>
>>>>>> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <
>>>>>> felixcheung_m@hotmail.com> wrote:
>>>>>>
>>>>>>> Is it required for DataReader to support all known DataFormat?
>>>>>>>
>>>>>>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>>>>>>> specifically how are we going to express capability of the given reader of
>>>>>>> its supported format(s), or specific support for each of “real-time data in
>>>>>>> row format, and history data in columnar format”?
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> *From:* Wenchen Fan <cl...@gmail.com>
>>>>>>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>>>>>>> *To:* Spark dev list
>>>>>>> *Subject:* [discuss][data source v2] remove type parameter in
>>>>>>> DataReader/WriterFactory
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'd like to propose an API change to the data source v2.
>>>>>>>
>>>>>>> One design goal of data source v2 is API type safety. The FileFormat
>>>>>>> API is a bad example, it asks the implementation to return
>>>>>>> InternalRow even it's actually ColumnarBatch. In data source v2 we
>>>>>>> add a type parameter to DataReader/WriterFactoty and
>>>>>>> DataReader/Writer, so that data source supporting columnar scan
>>>>>>> returns ColumnarBatch at API level.
>>>>>>>
>>>>>>> However, we met some problems when migrating streaming and
>>>>>>> file-based data source to data source v2.
>>>>>>>
>>>>>>> For the streaming side, we need a variant of
>>>>>>> DataReader/WriterFactory to add streaming specific concept like
>>>>>>> epoch id and offset. For details please see ContinuousDataReaderFactory
>>>>>>> and https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrg
>>>>>>> pEWhrsPqbhyRwUVl9V1wPOE/edit#
>>>>>>>
>>>>>>> But this conflicts with the special format mixin traits like
>>>>>>> SupportsScanColumnarBatch. We have to make the streaming variant of
>>>>>>> DataReader/WriterFactory to extend the original
>>>>>>> DataReader/WriterFactory, and do type cast at runtime, which is
>>>>>>> unnecessary and violate the type safety.
>>>>>>>
>>>>>>> For the file-based data source side, we have a problem with code
>>>>>>> duplication. Let's take ORC data source as an example. To support both
>>>>>>> unsafe row and columnar batch scan, we need something like
>>>>>>>
>>>>>>> // A lot of parameters to carry to the executor side
>>>>>>> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>>>>>>>   def createDataReader ...
>>>>>>> }
>>>>>>>
>>>>>>> class OrcColumnarBatchFactory(...) extends
>>>>>>> DataReaderFactory[ColumnarBatch] {
>>>>>>>   def createDataReader ...
>>>>>>> }
>>>>>>>
>>>>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>>>>   def createUnsafeRowFactories = ... // logic to prepare the
>>>>>>> parameters and create factories
>>>>>>>
>>>>>>>   def createColumnarBatchFactories = ... // logic to prepare the
>>>>>>> parameters and create factories
>>>>>>> }
>>>>>>>
>>>>>>> You can see that we have duplicated logic for preparing parameters
>>>>>>> and defining the factory.
>>>>>>>
>>>>>>> Here I propose to remove all the special format mixin traits and
>>>>>>> change the factory interface to
>>>>>>>
>>>>>>> public enum DataFormat {
>>>>>>>   ROW,
>>>>>>>   INTERNAL_ROW,
>>>>>>>   UNSAFE_ROW,
>>>>>>>   COLUMNAR_BATCH
>>>>>>> }
>>>>>>>
>>>>>>> interface DataReaderFactory {
>>>>>>>   DataFormat dataFormat;
>>>>>>>
>>>>>>>   default DataReader<Row> createRowDataReader() {
>>>>>>>     throw new IllegalStateException();
>>>>>>>   }
>>>>>>>
>>>>>>>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>>>>>>>     throw new IllegalStateException();
>>>>>>>   }
>>>>>>>
>>>>>>>   default DataReader<ColumnarBatch> createColumnarBatchDataReader()
>>>>>>> {
>>>>>>>     throw new IllegalStateException();
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> Spark will look at the dataFormat and decide which create data
>>>>>>> reader method to call.
>>>>>>>
>>>>>>> Now we don't have the problem for the streaming side as these
>>>>>>> special format mixin traits go away. And the ORC data source can also be
>>>>>>> simplified to
>>>>>>>
>>>>>>> class OrcReaderFactory(...) extends DataReaderFactory {
>>>>>>>   def createUnsafeRowReader ...
>>>>>>>
>>>>>>>   def createColumnarBatchReader ...
>>>>>>> }
>>>>>>>
>>>>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>>>>   def createReadFactories = ... // logic to prepare the parameters
>>>>>>> and create factories
>>>>>>> }
>>>>>>>
>>>>>>> We also have a potential benefit of supporting hybrid storage data
>>>>>>> source, which may keep real-time data in row format, and history data in
>>>>>>> columnar format. Then they can make some DataReaderFactory output
>>>>>>> InternalRow and some output ColumnarBatch.
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>> ​
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Joseph Torres <jo...@databricks.com>.
ContinuousReader and friends haven't yet gotten a design doc because
they're significantly constrained by the (incomplete) implementation
of SPARK-20928. Since continuous processing queries never pause and pass
control back to the driver, implementation details of the continuous
processing engine will determine what methods and lifecycles are possible
to support at the data source API level.

Within a few weeks, I'm confident we can bring SPARK-20928 to the point
where a useful design doc can be proposed. Until we have such a doc, I
agree that we should not consider changes to the general V2 surface based
on the preliminary API which currently exists.

On Thu, Apr 19, 2018 at 1:47 PM, Ryan Blue <rb...@netflix.com> wrote:

> Wenchen, thanks for clarifying.
>
> I think it is valuable to consider the API as a whole because it’s
> difficult to think about the impact of these changes otherwise. With that
> in mind, here’s a snapshot of the relevant portion of the batch API, which
> I think is pretty reasonable:
>
> v2.ReadSupport // source supports reading
>   DataSourceReader createReader(options)
>
> v2.DataSourceReader // created to configure and perform a read
>   List<DataReaderFactory<InternalRow>> createDataReaderFactories()
>
> v2.SupportsScanColumnarBatch // Reader mix-in to create vector readers
>   List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories()
>
> v2.DataReaderFactory<D> // each one is a unique read task; equivalent to Iterable<D>
>   DataReader<D> createDataReader()
>
> v2.DataReader<D> // equivalent to an Iterator<D>
>
> And here’s the current streaming side, I think (not including Microbatch
> classes):
>
> v2.ContinuousReadSupport // source supports continuous reading
>   ContinuousReader createContinuousReader(schema, checkpointLocation, options)
>
> v2.ContinuousReader extends DataSourceReader // configure/perform a continuous read
>   // inherits createDataReaderFactories() from DataSourceReader
>   // may have createBatchDataReaderFactories() from SupportsScanColumnarBatch
>
> v2.ContinuousDataReaderFactory<D> // a continuous read task
>   DataReader<D> createDataReaderWithOffset(PartitionOffset)
>
> Looks like the reason why casting is required is that
> ContinuousReader#createDataReaderFactories is inherited and doesn’t
> return ContinuousDataReaderFactory even though it actually needs to. In
> that case, why reuse DataSourceReader when ContinuousReader could expose
> create for a list of continuous factories/tasks? Then we have just one
> mix-in trait for batch and one for streaming. This looks like a consequence
> of partially reusing classes. I don’t think there is enough reason to
> refactor the API here.
>
> Not refactoring has a few benefits:
>
>    - Keeping the mix-in structure maintains consistency with the rest of
>    the API, which uses mix-ins for optional traits.
>    - It also keeps the API small for simple or basic implementations:
>    mix-ins bring in more options, but they are entirely optional. Adding 4
>    methods to each factory/task is more complicated.
>    - This maintains the intent of the task and data-reader classes, which
>    is to provide an API like Iterable/Iterator.
>
> I think the second problem can be solved by inheritance where necessary,
> but I don’t know how big of a problem this is. How many implementations are
> going to provide both row and vector reads? Why would an implementation
> provide both? If streaming and batch need to be separate, then the
> constructors will probably be different as well. I don’t think changing the
> API is going to be useful for this.
>
> In addition, *I think this discussion is very likely a consequence of not
> proposing and discussing the v2 streaming API publicly*. There’s no
> published design that gives a high-level overview of the streaming API, and
> I’m really concerned because problems with it are resulting in proposed
> refactors to the batch API that *was* discussed and is already available.
>
> The write-side design doc that Joseph put together is a good start,
> especially the diagram because it gives a great visual to help reason about
> it. Could you please put together a doc for the read side as well?
>
> rb
>
>
>
> On Wed, Apr 18, 2018 at 10:20 PM, Wenchen Fan <cl...@gmail.com> wrote:
>
>  First of all, I think we all agree that data source v2 API should at
>> least support InternalRow and ColumnarBatch. With this assumption, the
>> current API has 2 problems:
>>
>> *First problem*: We use mixin traits to add support for different data
>> formats.
>>
>> The mixin traits define API to return DataReader/WriterFactory for
>> different formats. It brings a lot of trouble to streaming, as streaming
>> has its own factory interface, which we don't want it to extend the batch
>> factory. This means we need to duplicate the mixin traits for batch and
>> streaming. Keep in mind that duplicating the traits is also a possible
>> solution, if there is no better way.
>>
>> Another possible solution is, remove the mixin traits and put all
>> "createFactory" method in DataSourceReader/Writer, with a new method to
>> indicate which "createFactory" method Spark should call. Then the API looks
>> like
>>
>> interface DataSourceReader {
>>   DataFormat dataFormat;
>>
>>   default List<DataReaderFactory<Row>> createDataReaderFactories() {
>>     throw new IllegalStateException();
>>   }
>>
>>   default List<DataReaderFactory<ColumnarBatch>>
>> createColumnarBatchDataReaderFactories() {
>>     throw new IllegalStateException();
>>   }
>> }
>>
>> or to be more friendly to people who don't care about columnar format
>>
>> interface DataSourceReader {
>>   default DataFormat dataFormat { return DataFormat.INTERNAL_ROW };
>>
>>   List<DataReaderFactory<Row>> createDataReaderFactories();
>>
>>   default List<DataReaderFactory<ColumnarBatch>> createColumnarBatchDataReaderFactories()
>> {
>>     throw new IllegalStateException();
>>   }
>> }
>>
>> This solution still brings some trouble to streaming, as the streaming
>> specific DataSourceReader needs to re-define all these "createFactory"
>> methods, but it's much better than duplicating the mixin traits.
>>
>> *Second problem*: The DataReader/WriterFactory may have a lot of
>> constructor parameters, it's painful to define different factories with the
>> same but very long parameter list.
>> After a closer look, I think this is the major part of the duplicated
>> code. This is not a strong reason, so it's OK if people don't think it's a
>> problem. In the meanwhile, I think it might be better to shift the data
>> format stuff to the factory so that we can support hybrid storage data
>> source in the future, like I mentioned before.
>>
>>
>> Finally, we can also consider Joseph's proposal, to remove the type
>> parameter entirely and get rid of this problem.
>>
>>
>>
>> On Thu, Apr 19, 2018 at 8:54 AM, Joseph Torres <
>> joseph.torres@databricks.com> wrote:
>>
>>> The fundamental difficulty seems to be that there's a spurious
>>> "round-trip" in the API. Spark inspects the source to determine what type
>>> it's going to provide, picks an appropriate method according to that type,
>>> and then calls that method on the source to finally get what it wants.
>>> Pushing this out of the DataSourceReader doesn't eliminate this problem; it
>>> just shifts it. We still need an InternalRow method and a ColumnarBatch
>>> method and possibly Row and UnsafeRow methods too.
>>>
>>> I'd propose it would be better to just accept a bit less type safety
>>> here, and push the problem all the way down to the DataReader. Make
>>> DataReader.get() return Object, and document that the runtime type had
>>> better match the type declared in the reader's DataFormat. Then we can get
>>> rid of the special Row/UnsafeRow/ColumnarBatch methods cluttering up the
>>> API, and figure out whether to support Row and UnsafeRow independently of
>>> all our other API decisions. (I didn't think about this until now, but the
>>> fact that some orthogonal API decisions have to be conditioned on which set
>>> of row formats we support seems like a code smell.)
>>>
>>> On Wed, Apr 18, 2018 at 3:53 PM, Ryan Blue <rb...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Wenchen, can you explain a bit more clearly why this is necessary? The
>>>> pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
>>>> handled this with inheritance from an abstract Factory class? Why define
>>>> all of the createXDataReader methods, but make the DataFormat a field
>>>> in the factory?
>>>>
>>>> A related issue is that I think there’s a strong case that the v2
>>>> sources should produce only InternalRow and that Row and UnsafeRow
>>>> shouldn’t be exposed; see SPARK-23325
>>>> <https://issues.apache.org/jira/browse/SPARK-23325>. The basic
>>>> arguments are:
>>>>
>>>>    - UnsafeRow is really difficult to produce without using Spark’s
>>>>    projection methods. If implementations can produce UnsafeRow, then
>>>>    they can still pass them as InternalRow and the projection Spark
>>>>    adds would be a no-op. When implementations can’t produce UnsafeRow,
>>>>    then it is better for Spark to insert the projection to unsafe. An example
>>>>    of a data format that doesn’t produce unsafe is the built-in Parquet
>>>>    source, which produces InternalRow and projects before returning
>>>>    the row.
>>>>    - For Row, I see no good reason to support it in a new interface
>>>>    when it will just introduce an extra transformation. The argument that
>>>>    Row is the “public” API doesn’t apply because UnsafeRow is already
>>>>    exposed through the v2 API.
>>>>    - Standardizing on InternalRow would remove the need for these
>>>>    interfaces entirely and simplify what implementers must provide and would
>>>>    reduce confusion over what to do.
>>>>
>>>> Using InternalRow doesn’t cover the case where we want to produce
>>>> ColumnarBatch instead, so what you’re proposing might still be a good
>>>> idea. I just think that we can simplify either path.
>>>> ​
>>>>
>>>> On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan <cl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yea definitely not. The only requirement is, the
>>>>> DataReader/WriterFactory must support at least one DataFormat.
>>>>>
>>>>> >  how are we going to express capability of the given reader of its
>>>>> supported format(s), or specific support for each of “real-time data in row
>>>>> format, and history data in columnar format”?
>>>>>
>>>>> When DataSourceReader/Writer create factories, the factory must
>>>>> contain enough information to decide the data format. Let's take ORC as an
>>>>> example. In OrcReaderFactory, it knows which files to read, and which
>>>>> columns to output. Since now Spark only support columnar scan for simple
>>>>> types, OrcReaderFactory will only output ColumnarBatch if the columns
>>>>> to scan are all simple types.
>>>>>
>>>>> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <
>>>>> felixcheung_m@hotmail.com> wrote:
>>>>>
>>>>>> Is it required for DataReader to support all known DataFormat?
>>>>>>
>>>>>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>>>>>> specifically how are we going to express capability of the given reader of
>>>>>> its supported format(s), or specific support for each of “real-time data in
>>>>>> row format, and history data in columnar format”?
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* Wenchen Fan <cl...@gmail.com>
>>>>>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>>>>>> *To:* Spark dev list
>>>>>> *Subject:* [discuss][data source v2] remove type parameter in
>>>>>> DataReader/WriterFactory
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'd like to propose an API change to the data source v2.
>>>>>>
>>>>>> One design goal of data source v2 is API type safety. The FileFormat
>>>>>> API is a bad example, it asks the implementation to return
>>>>>> InternalRow even it's actually ColumnarBatch. In data source v2 we
>>>>>> add a type parameter to DataReader/WriterFactoty and
>>>>>> DataReader/Writer, so that data source supporting columnar scan
>>>>>> returns ColumnarBatch at API level.
>>>>>>
>>>>>> However, we met some problems when migrating streaming and file-based
>>>>>> data source to data source v2.
>>>>>>
>>>>>> For the streaming side, we need a variant of DataReader/WriterFactory
>>>>>> to add streaming specific concept like epoch id and offset. For details
>>>>>> please see ContinuousDataReaderFactory and https://docs.google.com/do
>>>>>> cument/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>>>>>>
>>>>>> But this conflicts with the special format mixin traits like
>>>>>> SupportsScanColumnarBatch. We have to make the streaming variant of
>>>>>> DataReader/WriterFactory to extend the original
>>>>>> DataReader/WriterFactory, and do type cast at runtime, which is
>>>>>> unnecessary and violate the type safety.
>>>>>>
>>>>>> For the file-based data source side, we have a problem with code
>>>>>> duplication. Let's take ORC data source as an example. To support both
>>>>>> unsafe row and columnar batch scan, we need something like
>>>>>>
>>>>>> // A lot of parameters to carry to the executor side
>>>>>> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>>>>>>   def createDataReader ...
>>>>>> }
>>>>>>
>>>>>> class OrcColumnarBatchFactory(...) extends
>>>>>> DataReaderFactory[ColumnarBatch] {
>>>>>>   def createDataReader ...
>>>>>> }
>>>>>>
>>>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>>>   def createUnsafeRowFactories = ... // logic to prepare the
>>>>>> parameters and create factories
>>>>>>
>>>>>>   def createColumnarBatchFactories = ... // logic to prepare the
>>>>>> parameters and create factories
>>>>>> }
>>>>>>
>>>>>> You can see that we have duplicated logic for preparing parameters
>>>>>> and defining the factory.
>>>>>>
>>>>>> Here I propose to remove all the special format mixin traits and
>>>>>> change the factory interface to
>>>>>>
>>>>>> public enum DataFormat {
>>>>>>   ROW,
>>>>>>   INTERNAL_ROW,
>>>>>>   UNSAFE_ROW,
>>>>>>   COLUMNAR_BATCH
>>>>>> }
>>>>>>
>>>>>> interface DataReaderFactory {
>>>>>>   DataFormat dataFormat;
>>>>>>
>>>>>>   default DataReader<Row> createRowDataReader() {
>>>>>>     throw new IllegalStateException();
>>>>>>   }
>>>>>>
>>>>>>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>>>>>>     throw new IllegalStateException();
>>>>>>   }
>>>>>>
>>>>>>   default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
>>>>>>     throw new IllegalStateException();
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Spark will look at the dataFormat and decide which create data
>>>>>> reader method to call.
>>>>>>
>>>>>> Now we don't have the problem for the streaming side as these special
>>>>>> format mixin traits go away. And the ORC data source can also be simplified
>>>>>> to
>>>>>>
>>>>>> class OrcReaderFactory(...) extends DataReaderFactory {
>>>>>>   def createUnsafeRowReader ...
>>>>>>
>>>>>>   def createColumnarBatchReader ...
>>>>>> }
>>>>>>
>>>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>>>   def createReadFactories = ... // logic to prepare the parameters
>>>>>> and create factories
>>>>>> }
>>>>>>
>>>>>> We also have a potential benefit of supporting hybrid storage data
>>>>>> source, which may keep real-time data in row format, and history data in
>>>>>> columnar format. Then they can make some DataReaderFactory output
>>>>>> InternalRow and some output ColumnarBatch.
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>> ​
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Wenchen, thanks for clarifying.

I think it is valuable to consider the API as a whole because it’s
difficult to think about the impact of these changes otherwise. With that
in mind, here’s a snapshot of the relevant portion of the batch API, which
I think is pretty reasonable:

v2.ReadSupport // source supports reading
  DataSourceReader createReader(options)

v2.DataSourceReader // created to configure and perform a read
  List<DataReaderFactory<InternalRow>> createDataReaderFactories()

v2.SupportsScanColumnarBatch // Reader mix-in to create vector readers
  List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories()

v2.DataReaderFactory<D> // each one is a unique read task; equivalent
to Iterable<D>
  DataReader<D> createDataReader()

v2.DataReader<D> // equivalent to an Iterator<D>

And here’s the current streaming side, I think (not including Microbatch
classes):

v2.ContinuousReadSupport // source supports continuous reading
  ContinuousReader createContinuousReader(schema, checkpointLocation, options)

v2.ContinuousReader extends DataSourceReader // configure/perform a
continuous read
  // inherits createDataReaderFactories() from DataSourceReader
  // may have createBatchDataReaderFactories() from SupportsScanColumnarBatch

v2.ContinuousDataReaderFactory<D> // a continuous read task
  DataReader<D> createDataReaderWithOffset(PartitionOffset)

Looks like the reason why casting is required is that
ContinuousReader#createDataReaderFactories is inherited and doesn’t return
ContinuousDataReaderFactory even though it actually needs to. In that case,
why reuse DataSourceReader when ContinuousReader could expose create for a
list of continuous factories/tasks? Then we have just one mix-in trait for
batch and one for streaming. This looks like a consequence of partially
reusing classes. I don’t think there is enough reason to refactor the API
here.

Not refactoring has a few benefits:

   - Keeping the mix-in structure maintains consistency with the rest of
   the API, which uses mix-ins for optional traits.
   - It also keeps the API small for simple or basic implementations:
   mix-ins bring in more options, but they are entirely optional. Adding 4
   methods to each factory/task is more complicated.
   - This maintains the intent of the task and data-reader classes, which
   is to provide an API like Iterable/Iterator.

I think the second problem can be solved by inheritance where necessary,
but I don’t know how big of a problem this is. How many implementations are
going to provide both row and vector reads? Why would an implementation
provide both? If streaming and batch need to be separate, then the
constructors will probably be different as well. I don’t think changing the
API is going to be useful for this.

In addition, *I think this discussion is very likely a consequence of not
proposing and discussing the v2 streaming API publicly*. There’s no
published design that gives a high-level overview of the streaming API, and
I’m really concerned because problems with it are resulting in proposed
refactors to the batch API that *was* discussed and is already available.

The write-side design doc that Joseph put together is a good start,
especially the diagram because it gives a great visual to help reason about
it. Could you please put together a doc for the read side as well?

rb



On Wed, Apr 18, 2018 at 10:20 PM, Wenchen Fan <cl...@gmail.com> wrote:

 First of all, I think we all agree that data source v2 API should at least
> support InternalRow and ColumnarBatch. With this assumption, the current
> API has 2 problems:
>
> *First problem*: We use mixin traits to add support for different data
> formats.
>
> The mixin traits define API to return DataReader/WriterFactory for
> different formats. It brings a lot of trouble to streaming, as streaming
> has its own factory interface, which we don't want it to extend the batch
> factory. This means we need to duplicate the mixin traits for batch and
> streaming. Keep in mind that duplicating the traits is also a possible
> solution, if there is no better way.
>
> Another possible solution is, remove the mixin traits and put all
> "createFactory" method in DataSourceReader/Writer, with a new method to
> indicate which "createFactory" method Spark should call. Then the API looks
> like
>
> interface DataSourceReader {
>   DataFormat dataFormat;
>
>   default List<DataReaderFactory<Row>> createDataReaderFactories() {
>     throw new IllegalStateException();
>   }
>
>   default List<DataReaderFactory<ColumnarBatch>>
> createColumnarBatchDataReaderFactories() {
>     throw new IllegalStateException();
>   }
> }
>
> or to be more friendly to people who don't care about columnar format
>
> interface DataSourceReader {
>   default DataFormat dataFormat { return DataFormat.INTERNAL_ROW };
>
>   List<DataReaderFactory<Row>> createDataReaderFactories();
>
>   default List<DataReaderFactory<ColumnarBatch>> createColumnarBatchDataReaderFactories()
> {
>     throw new IllegalStateException();
>   }
> }
>
> This solution still brings some trouble to streaming, as the streaming
> specific DataSourceReader needs to re-define all these "createFactory"
> methods, but it's much better than duplicating the mixin traits.
>
> *Second problem*: The DataReader/WriterFactory may have a lot of
> constructor parameters, it's painful to define different factories with the
> same but very long parameter list.
> After a closer look, I think this is the major part of the duplicated
> code. This is not a strong reason, so it's OK if people don't think it's a
> problem. In the meanwhile, I think it might be better to shift the data
> format stuff to the factory so that we can support hybrid storage data
> source in the future, like I mentioned before.
>
>
> Finally, we can also consider Joseph's proposal, to remove the type
> parameter entirely and get rid of this problem.
>
>
>
> On Thu, Apr 19, 2018 at 8:54 AM, Joseph Torres <
> joseph.torres@databricks.com> wrote:
>
>> The fundamental difficulty seems to be that there's a spurious
>> "round-trip" in the API. Spark inspects the source to determine what type
>> it's going to provide, picks an appropriate method according to that type,
>> and then calls that method on the source to finally get what it wants.
>> Pushing this out of the DataSourceReader doesn't eliminate this problem; it
>> just shifts it. We still need an InternalRow method and a ColumnarBatch
>> method and possibly Row and UnsafeRow methods too.
>>
>> I'd propose it would be better to just accept a bit less type safety
>> here, and push the problem all the way down to the DataReader. Make
>> DataReader.get() return Object, and document that the runtime type had
>> better match the type declared in the reader's DataFormat. Then we can get
>> rid of the special Row/UnsafeRow/ColumnarBatch methods cluttering up the
>> API, and figure out whether to support Row and UnsafeRow independently of
>> all our other API decisions. (I didn't think about this until now, but the
>> fact that some orthogonal API decisions have to be conditioned on which set
>> of row formats we support seems like a code smell.)
>>
>> On Wed, Apr 18, 2018 at 3:53 PM, Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Wenchen, can you explain a bit more clearly why this is necessary? The
>>> pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
>>> handled this with inheritance from an abstract Factory class? Why define
>>> all of the createXDataReader methods, but make the DataFormat a field
>>> in the factory?
>>>
>>> A related issue is that I think there’s a strong case that the v2
>>> sources should produce only InternalRow and that Row and UnsafeRow
>>> shouldn’t be exposed; see SPARK-23325
>>> <https://issues.apache.org/jira/browse/SPARK-23325>. The basic
>>> arguments are:
>>>
>>>    - UnsafeRow is really difficult to produce without using Spark’s
>>>    projection methods. If implementations can produce UnsafeRow, then
>>>    they can still pass them as InternalRow and the projection Spark
>>>    adds would be a no-op. When implementations can’t produce UnsafeRow,
>>>    then it is better for Spark to insert the projection to unsafe. An example
>>>    of a data format that doesn’t produce unsafe is the built-in Parquet
>>>    source, which produces InternalRow and projects before returning the
>>>    row.
>>>    - For Row, I see no good reason to support it in a new interface
>>>    when it will just introduce an extra transformation. The argument that
>>>    Row is the “public” API doesn’t apply because UnsafeRow is already
>>>    exposed through the v2 API.
>>>    - Standardizing on InternalRow would remove the need for these
>>>    interfaces entirely and simplify what implementers must provide and would
>>>    reduce confusion over what to do.
>>>
>>> Using InternalRow doesn’t cover the case where we want to produce
>>> ColumnarBatch instead, so what you’re proposing might still be a good
>>> idea. I just think that we can simplify either path.
>>> ​
>>>
>>> On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan <cl...@gmail.com>
>>> wrote:
>>>
>>>> Yea definitely not. The only requirement is, the
>>>> DataReader/WriterFactory must support at least one DataFormat.
>>>>
>>>> >  how are we going to express capability of the given reader of its
>>>> supported format(s), or specific support for each of “real-time data in row
>>>> format, and history data in columnar format”?
>>>>
>>>> When DataSourceReader/Writer create factories, the factory must
>>>> contain enough information to decide the data format. Let's take ORC as an
>>>> example. In OrcReaderFactory, it knows which files to read, and which
>>>> columns to output. Since now Spark only support columnar scan for simple
>>>> types, OrcReaderFactory will only output ColumnarBatch if the columns
>>>> to scan are all simple types.
>>>>
>>>> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <
>>>> felixcheung_m@hotmail.com> wrote:
>>>>
>>>>> Is it required for DataReader to support all known DataFormat?
>>>>>
>>>>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>>>>> specifically how are we going to express capability of the given reader of
>>>>> its supported format(s), or specific support for each of “real-time data in
>>>>> row format, and history data in columnar format”?
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Wenchen Fan <cl...@gmail.com>
>>>>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>>>>> *To:* Spark dev list
>>>>> *Subject:* [discuss][data source v2] remove type parameter in
>>>>> DataReader/WriterFactory
>>>>>
>>>>> Hi all,
>>>>>
>>>>> I'd like to propose an API change to the data source v2.
>>>>>
>>>>> One design goal of data source v2 is API type safety. The FileFormat
>>>>> API is a bad example, it asks the implementation to return InternalRow
>>>>> even it's actually ColumnarBatch. In data source v2 we add a type
>>>>> parameter to DataReader/WriterFactoty and DataReader/Writer, so that
>>>>> data source supporting columnar scan returns ColumnarBatch at API
>>>>> level.
>>>>>
>>>>> However, we met some problems when migrating streaming and file-based
>>>>> data source to data source v2.
>>>>>
>>>>> For the streaming side, we need a variant of DataReader/WriterFactory
>>>>> to add streaming specific concept like epoch id and offset. For details
>>>>> please see ContinuousDataReaderFactory and https://docs.google.com/do
>>>>> cument/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>>>>>
>>>>> But this conflicts with the special format mixin traits like
>>>>> SupportsScanColumnarBatch. We have to make the streaming variant of
>>>>> DataReader/WriterFactory to extend the original
>>>>> DataReader/WriterFactory, and do type cast at runtime, which is
>>>>> unnecessary and violate the type safety.
>>>>>
>>>>> For the file-based data source side, we have a problem with code
>>>>> duplication. Let's take ORC data source as an example. To support both
>>>>> unsafe row and columnar batch scan, we need something like
>>>>>
>>>>> // A lot of parameters to carry to the executor side
>>>>> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>>>>>   def createDataReader ...
>>>>> }
>>>>>
>>>>> class OrcColumnarBatchFactory(...) extends
>>>>> DataReaderFactory[ColumnarBatch] {
>>>>>   def createDataReader ...
>>>>> }
>>>>>
>>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>>   def createUnsafeRowFactories = ... // logic to prepare the
>>>>> parameters and create factories
>>>>>
>>>>>   def createColumnarBatchFactories = ... // logic to prepare the
>>>>> parameters and create factories
>>>>> }
>>>>>
>>>>> You can see that we have duplicated logic for preparing parameters and
>>>>> defining the factory.
>>>>>
>>>>> Here I propose to remove all the special format mixin traits and
>>>>> change the factory interface to
>>>>>
>>>>> public enum DataFormat {
>>>>>   ROW,
>>>>>   INTERNAL_ROW,
>>>>>   UNSAFE_ROW,
>>>>>   COLUMNAR_BATCH
>>>>> }
>>>>>
>>>>> interface DataReaderFactory {
>>>>>   DataFormat dataFormat;
>>>>>
>>>>>   default DataReader<Row> createRowDataReader() {
>>>>>     throw new IllegalStateException();
>>>>>   }
>>>>>
>>>>>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>>>>>     throw new IllegalStateException();
>>>>>   }
>>>>>
>>>>>   default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
>>>>>     throw new IllegalStateException();
>>>>>   }
>>>>> }
>>>>>
>>>>> Spark will look at the dataFormat and decide which create data reader
>>>>> method to call.
>>>>>
>>>>> Now we don't have the problem for the streaming side as these special
>>>>> format mixin traits go away. And the ORC data source can also be simplified
>>>>> to
>>>>>
>>>>> class OrcReaderFactory(...) extends DataReaderFactory {
>>>>>   def createUnsafeRowReader ...
>>>>>
>>>>>   def createColumnarBatchReader ...
>>>>> }
>>>>>
>>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>>   def createReadFactories = ... // logic to prepare the parameters and
>>>>> create factories
>>>>> }
>>>>>
>>>>> We also have a potential benefit of supporting hybrid storage data
>>>>> source, which may keep real-time data in row format, and history data in
>>>>> columnar format. Then they can make some DataReaderFactory output
>>>>> InternalRow and some output ColumnarBatch.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
> ​
-- 
Ryan Blue
Software Engineer
Netflix

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Wenchen Fan <cl...@gmail.com>.
 First of all, I think we all agree that data source v2 API should at least
support InternalRow and ColumnarBatch. With this assumption, the current
API has 2 problems:

*First problem*: We use mixin traits to add support for different data
formats.

The mixin traits define API to return DataReader/WriterFactory for
different formats. It brings a lot of trouble to streaming, as streaming
has its own factory interface, which we don't want it to extend the batch
factory. This means we need to duplicate the mixin traits for batch and
streaming. Keep in mind that duplicating the traits is also a possible
solution, if there is no better way.

Another possible solution is, remove the mixin traits and put all
"createFactory" method in DataSourceReader/Writer, with a new method to
indicate which "createFactory" method Spark should call. Then the API looks
like

interface DataSourceReader {
  DataFormat dataFormat;

  default List<DataReaderFactory<Row>> createDataReaderFactories() {
    throw new IllegalStateException();
  }

  default List<DataReaderFactory<ColumnarBatch>>
createColumnarBatchDataReaderFactories() {
    throw new IllegalStateException();
  }
}

or to be more friendly to people who don't care about columnar format

interface DataSourceReader {
  default DataFormat dataFormat { return DataFormat.INTERNAL_ROW };

  List<DataReaderFactory<Row>> createDataReaderFactories();

  default List<DataReaderFactory<ColumnarBatch>>
createColumnarBatchDataReaderFactories()
{
    throw new IllegalStateException();
  }
}

This solution still brings some trouble to streaming, as the streaming
specific DataSourceReader needs to re-define all these "createFactory"
methods, but it's much better than duplicating the mixin traits.

*Second problem*: The DataReader/WriterFactory may have a lot of
constructor parameters, it's painful to define different factories with the
same but very long parameter list.
After a closer look, I think this is the major part of the duplicated code.
This is not a strong reason, so it's OK if people don't think it's a
problem. In the meanwhile, I think it might be better to shift the data
format stuff to the factory so that we can support hybrid storage data
source in the future, like I mentioned before.


Finally, we can also consider Joseph's proposal, to remove the type
parameter entirely and get rid of this problem.



On Thu, Apr 19, 2018 at 8:54 AM, Joseph Torres <joseph.torres@databricks.com
> wrote:

> The fundamental difficulty seems to be that there's a spurious
> "round-trip" in the API. Spark inspects the source to determine what type
> it's going to provide, picks an appropriate method according to that type,
> and then calls that method on the source to finally get what it wants.
> Pushing this out of the DataSourceReader doesn't eliminate this problem; it
> just shifts it. We still need an InternalRow method and a ColumnarBatch
> method and possibly Row and UnsafeRow methods too.
>
> I'd propose it would be better to just accept a bit less type safety here,
> and push the problem all the way down to the DataReader. Make
> DataReader.get() return Object, and document that the runtime type had
> better match the type declared in the reader's DataFormat. Then we can get
> rid of the special Row/UnsafeRow/ColumnarBatch methods cluttering up the
> API, and figure out whether to support Row and UnsafeRow independently of
> all our other API decisions. (I didn't think about this until now, but the
> fact that some orthogonal API decisions have to be conditioned on which set
> of row formats we support seems like a code smell.)
>
> On Wed, Apr 18, 2018 at 3:53 PM, Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Wenchen, can you explain a bit more clearly why this is necessary? The
>> pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
>> handled this with inheritance from an abstract Factory class? Why define
>> all of the createXDataReader methods, but make the DataFormat a field in
>> the factory?
>>
>> A related issue is that I think there’s a strong case that the v2 sources
>> should produce only InternalRow and that Row and UnsafeRow shouldn’t be
>> exposed; see SPARK-23325
>> <https://issues.apache.org/jira/browse/SPARK-23325>. The basic arguments
>> are:
>>
>>    - UnsafeRow is really difficult to produce without using Spark’s
>>    projection methods. If implementations can produce UnsafeRow, then
>>    they can still pass them as InternalRow and the projection Spark adds
>>    would be a no-op. When implementations can’t produce UnsafeRow, then
>>    it is better for Spark to insert the projection to unsafe. An example of a
>>    data format that doesn’t produce unsafe is the built-in Parquet source,
>>    which produces InternalRow and projects before returning the row.
>>    - For Row, I see no good reason to support it in a new interface when
>>    it will just introduce an extra transformation. The argument that Row
>>    is the “public” API doesn’t apply because UnsafeRow is already
>>    exposed through the v2 API.
>>    - Standardizing on InternalRow would remove the need for these
>>    interfaces entirely and simplify what implementers must provide and would
>>    reduce confusion over what to do.
>>
>> Using InternalRow doesn’t cover the case where we want to produce
>> ColumnarBatch instead, so what you’re proposing might still be a good
>> idea. I just think that we can simplify either path.
>> ​
>>
>> On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan <cl...@gmail.com>
>> wrote:
>>
>>> Yea definitely not. The only requirement is, the
>>> DataReader/WriterFactory must support at least one DataFormat.
>>>
>>> >  how are we going to express capability of the given reader of its
>>> supported format(s), or specific support for each of “real-time data in row
>>> format, and history data in columnar format”?
>>>
>>> When DataSourceReader/Writer create factories, the factory must contain
>>> enough information to decide the data format. Let's take ORC as an example.
>>> In OrcReaderFactory, it knows which files to read, and which columns to
>>> output. Since now Spark only support columnar scan for simple types,
>>> OrcReaderFactory will only output ColumnarBatch if the columns to scan
>>> are all simple types.
>>>
>>> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <
>>> felixcheung_m@hotmail.com> wrote:
>>>
>>>> Is it required for DataReader to support all known DataFormat?
>>>>
>>>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>>>> specifically how are we going to express capability of the given reader of
>>>> its supported format(s), or specific support for each of “real-time data in
>>>> row format, and history data in columnar format”?
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* Wenchen Fan <cl...@gmail.com>
>>>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>>>> *To:* Spark dev list
>>>> *Subject:* [discuss][data source v2] remove type parameter in
>>>> DataReader/WriterFactory
>>>>
>>>> Hi all,
>>>>
>>>> I'd like to propose an API change to the data source v2.
>>>>
>>>> One design goal of data source v2 is API type safety. The FileFormat
>>>> API is a bad example, it asks the implementation to return InternalRow
>>>> even it's actually ColumnarBatch. In data source v2 we add a type
>>>> parameter to DataReader/WriterFactoty and DataReader/Writer, so that
>>>> data source supporting columnar scan returns ColumnarBatch at API
>>>> level.
>>>>
>>>> However, we met some problems when migrating streaming and file-based
>>>> data source to data source v2.
>>>>
>>>> For the streaming side, we need a variant of DataReader/WriterFactory
>>>> to add streaming specific concept like epoch id and offset. For details
>>>> please see ContinuousDataReaderFactory and https://docs.google.com/do
>>>> cument/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>>>>
>>>> But this conflicts with the special format mixin traits like
>>>> SupportsScanColumnarBatch. We have to make the streaming variant of
>>>> DataReader/WriterFactory to extend the original
>>>> DataReader/WriterFactory, and do type cast at runtime, which is
>>>> unnecessary and violate the type safety.
>>>>
>>>> For the file-based data source side, we have a problem with code
>>>> duplication. Let's take ORC data source as an example. To support both
>>>> unsafe row and columnar batch scan, we need something like
>>>>
>>>> // A lot of parameters to carry to the executor side
>>>> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>>>>   def createDataReader ...
>>>> }
>>>>
>>>> class OrcColumnarBatchFactory(...) extends
>>>> DataReaderFactory[ColumnarBatch] {
>>>>   def createDataReader ...
>>>> }
>>>>
>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>   def createUnsafeRowFactories = ... // logic to prepare the parameters
>>>> and create factories
>>>>
>>>>   def createColumnarBatchFactories = ... // logic to prepare the
>>>> parameters and create factories
>>>> }
>>>>
>>>> You can see that we have duplicated logic for preparing parameters and
>>>> defining the factory.
>>>>
>>>> Here I propose to remove all the special format mixin traits and change
>>>> the factory interface to
>>>>
>>>> public enum DataFormat {
>>>>   ROW,
>>>>   INTERNAL_ROW,
>>>>   UNSAFE_ROW,
>>>>   COLUMNAR_BATCH
>>>> }
>>>>
>>>> interface DataReaderFactory {
>>>>   DataFormat dataFormat;
>>>>
>>>>   default DataReader<Row> createRowDataReader() {
>>>>     throw new IllegalStateException();
>>>>   }
>>>>
>>>>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>>>>     throw new IllegalStateException();
>>>>   }
>>>>
>>>>   default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
>>>>     throw new IllegalStateException();
>>>>   }
>>>> }
>>>>
>>>> Spark will look at the dataFormat and decide which create data reader
>>>> method to call.
>>>>
>>>> Now we don't have the problem for the streaming side as these special
>>>> format mixin traits go away. And the ORC data source can also be simplified
>>>> to
>>>>
>>>> class OrcReaderFactory(...) extends DataReaderFactory {
>>>>   def createUnsafeRowReader ...
>>>>
>>>>   def createColumnarBatchReader ...
>>>> }
>>>>
>>>> class OrcDataSourceReader extends DataSourceReader {
>>>>   def createReadFactories = ... // logic to prepare the parameters and
>>>> create factories
>>>> }
>>>>
>>>> We also have a potential benefit of supporting hybrid storage data
>>>> source, which may keep real-time data in row format, and history data in
>>>> columnar format. Then they can make some DataReaderFactory output
>>>> InternalRow and some output ColumnarBatch.
>>>>
>>>> Thoughts?
>>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Joseph Torres <jo...@databricks.com>.
The fundamental difficulty seems to be that there's a spurious "round-trip"
in the API. Spark inspects the source to determine what type it's going to
provide, picks an appropriate method according to that type, and then calls
that method on the source to finally get what it wants. Pushing this out of
the DataSourceReader doesn't eliminate this problem; it just shifts it. We
still need an InternalRow method and a ColumnarBatch method and possibly
Row and UnsafeRow methods too.

I'd propose it would be better to just accept a bit less type safety here,
and push the problem all the way down to the DataReader. Make
DataReader.get() return Object, and document that the runtime type had
better match the type declared in the reader's DataFormat. Then we can get
rid of the special Row/UnsafeRow/ColumnarBatch methods cluttering up the
API, and figure out whether to support Row and UnsafeRow independently of
all our other API decisions. (I didn't think about this until now, but the
fact that some orthogonal API decisions have to be conditioned on which set
of row formats we support seems like a code smell.)

On Wed, Apr 18, 2018 at 3:53 PM, Ryan Blue <rb...@netflix.com.invalid>
wrote:

> Wenchen, can you explain a bit more clearly why this is necessary? The
> pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
> handled this with inheritance from an abstract Factory class? Why define
> all of the createXDataReader methods, but make the DataFormat a field in
> the factory?
>
> A related issue is that I think there’s a strong case that the v2 sources
> should produce only InternalRow and that Row and UnsafeRow shouldn’t be
> exposed; see SPARK-23325
> <https://issues.apache.org/jira/browse/SPARK-23325>. The basic arguments
> are:
>
>    - UnsafeRow is really difficult to produce without using Spark’s
>    projection methods. If implementations can produce UnsafeRow, then
>    they can still pass them as InternalRow and the projection Spark adds
>    would be a no-op. When implementations can’t produce UnsafeRow, then
>    it is better for Spark to insert the projection to unsafe. An example of a
>    data format that doesn’t produce unsafe is the built-in Parquet source,
>    which produces InternalRow and projects before returning the row.
>    - For Row, I see no good reason to support it in a new interface when
>    it will just introduce an extra transformation. The argument that Row
>    is the “public” API doesn’t apply because UnsafeRow is already exposed
>    through the v2 API.
>    - Standardizing on InternalRow would remove the need for these
>    interfaces entirely and simplify what implementers must provide and would
>    reduce confusion over what to do.
>
> Using InternalRow doesn’t cover the case where we want to produce
> ColumnarBatch instead, so what you’re proposing might still be a good
> idea. I just think that we can simplify either path.
> ​
>
> On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan <cl...@gmail.com> wrote:
>
>> Yea definitely not. The only requirement is, the DataReader/WriterFactory
>> must support at least one DataFormat.
>>
>> >  how are we going to express capability of the given reader of its
>> supported format(s), or specific support for each of “real-time data in row
>> format, and history data in columnar format”?
>>
>> When DataSourceReader/Writer create factories, the factory must contain
>> enough information to decide the data format. Let's take ORC as an example.
>> In OrcReaderFactory, it knows which files to read, and which columns to
>> output. Since now Spark only support columnar scan for simple types,
>> OrcReaderFactory will only output ColumnarBatch if the columns to scan
>> are all simple types.
>>
>> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <felixcheung_m@hotmail.com
>> > wrote:
>>
>>> Is it required for DataReader to support all known DataFormat?
>>>
>>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>>> specifically how are we going to express capability of the given reader of
>>> its supported format(s), or specific support for each of “real-time data in
>>> row format, and history data in columnar format”?
>>>
>>>
>>> ------------------------------
>>> *From:* Wenchen Fan <cl...@gmail.com>
>>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>>> *To:* Spark dev list
>>> *Subject:* [discuss][data source v2] remove type parameter in
>>> DataReader/WriterFactory
>>>
>>> Hi all,
>>>
>>> I'd like to propose an API change to the data source v2.
>>>
>>> One design goal of data source v2 is API type safety. The FileFormat API
>>> is a bad example, it asks the implementation to return InternalRow even
>>> it's actually ColumnarBatch. In data source v2 we add a type parameter
>>> to DataReader/WriterFactoty and DataReader/Writer, so that data source
>>> supporting columnar scan returns ColumnarBatch at API level.
>>>
>>> However, we met some problems when migrating streaming and file-based
>>> data source to data source v2.
>>>
>>> For the streaming side, we need a variant of DataReader/WriterFactory
>>> to add streaming specific concept like epoch id and offset. For details
>>> please see ContinuousDataReaderFactory and https://docs.google.com/do
>>> cument/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>>>
>>> But this conflicts with the special format mixin traits like
>>> SupportsScanColumnarBatch. We have to make the streaming variant of
>>> DataReader/WriterFactory to extend the original DataReader/WriterFactory,
>>> and do type cast at runtime, which is unnecessary and violate the type
>>> safety.
>>>
>>> For the file-based data source side, we have a problem with code
>>> duplication. Let's take ORC data source as an example. To support both
>>> unsafe row and columnar batch scan, we need something like
>>>
>>> // A lot of parameters to carry to the executor side
>>> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>>>   def createDataReader ...
>>> }
>>>
>>> class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch]
>>> {
>>>   def createDataReader ...
>>> }
>>>
>>> class OrcDataSourceReader extends DataSourceReader {
>>>   def createUnsafeRowFactories = ... // logic to prepare the parameters
>>> and create factories
>>>
>>>   def createColumnarBatchFactories = ... // logic to prepare the
>>> parameters and create factories
>>> }
>>>
>>> You can see that we have duplicated logic for preparing parameters and
>>> defining the factory.
>>>
>>> Here I propose to remove all the special format mixin traits and change
>>> the factory interface to
>>>
>>> public enum DataFormat {
>>>   ROW,
>>>   INTERNAL_ROW,
>>>   UNSAFE_ROW,
>>>   COLUMNAR_BATCH
>>> }
>>>
>>> interface DataReaderFactory {
>>>   DataFormat dataFormat;
>>>
>>>   default DataReader<Row> createRowDataReader() {
>>>     throw new IllegalStateException();
>>>   }
>>>
>>>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>>>     throw new IllegalStateException();
>>>   }
>>>
>>>   default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
>>>     throw new IllegalStateException();
>>>   }
>>> }
>>>
>>> Spark will look at the dataFormat and decide which create data reader
>>> method to call.
>>>
>>> Now we don't have the problem for the streaming side as these special
>>> format mixin traits go away. And the ORC data source can also be simplified
>>> to
>>>
>>> class OrcReaderFactory(...) extends DataReaderFactory {
>>>   def createUnsafeRowReader ...
>>>
>>>   def createColumnarBatchReader ...
>>> }
>>>
>>> class OrcDataSourceReader extends DataSourceReader {
>>>   def createReadFactories = ... // logic to prepare the parameters and
>>> create factories
>>> }
>>>
>>> We also have a potential benefit of supporting hybrid storage data
>>> source, which may keep real-time data in row format, and history data in
>>> columnar format. Then they can make some DataReaderFactory output
>>> InternalRow and some output ColumnarBatch.
>>>
>>> Thoughts?
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Wenchen, can you explain a bit more clearly why this is necessary? The
pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
handled this with inheritance from an abstract Factory class? Why define
all of the createXDataReader methods, but make the DataFormat a field in
the factory?

A related issue is that I think there’s a strong case that the v2 sources
should produce only InternalRow and that Row and UnsafeRow shouldn’t be
exposed; see SPARK-23325 <https://issues.apache.org/jira/browse/SPARK-23325>.
The basic arguments are:

   - UnsafeRow is really difficult to produce without using Spark’s
   projection methods. If implementations can produce UnsafeRow, then they
   can still pass them as InternalRow and the projection Spark adds would
   be a no-op. When implementations can’t produce UnsafeRow, then it is
   better for Spark to insert the projection to unsafe. An example of a data
   format that doesn’t produce unsafe is the built-in Parquet source, which
   produces InternalRow and projects before returning the row.
   - For Row, I see no good reason to support it in a new interface when it
   will just introduce an extra transformation. The argument that Row is
   the “public” API doesn’t apply because UnsafeRow is already exposed
   through the v2 API.
   - Standardizing on InternalRow would remove the need for these
   interfaces entirely and simplify what implementers must provide and would
   reduce confusion over what to do.

Using InternalRow doesn’t cover the case where we want to produce
ColumnarBatch instead, so what you’re proposing might still be a good idea.
I just think that we can simplify either path.
​

On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan <cl...@gmail.com> wrote:

> Yea definitely not. The only requirement is, the DataReader/WriterFactory
> must support at least one DataFormat.
>
> >  how are we going to express capability of the given reader of its
> supported format(s), or specific support for each of “real-time data in row
> format, and history data in columnar format”?
>
> When DataSourceReader/Writer create factories, the factory must contain
> enough information to decide the data format. Let's take ORC as an example.
> In OrcReaderFactory, it knows which files to read, and which columns to
> output. Since now Spark only support columnar scan for simple types,
> OrcReaderFactory will only output ColumnarBatch if the columns to scan
> are all simple types.
>
> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <fe...@hotmail.com>
> wrote:
>
>> Is it required for DataReader to support all known DataFormat?
>>
>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>> specifically how are we going to express capability of the given reader of
>> its supported format(s), or specific support for each of “real-time data in
>> row format, and history data in columnar format”?
>>
>>
>> ------------------------------
>> *From:* Wenchen Fan <cl...@gmail.com>
>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>> *To:* Spark dev list
>> *Subject:* [discuss][data source v2] remove type parameter in
>> DataReader/WriterFactory
>>
>> Hi all,
>>
>> I'd like to propose an API change to the data source v2.
>>
>> One design goal of data source v2 is API type safety. The FileFormat API
>> is a bad example, it asks the implementation to return InternalRow even
>> it's actually ColumnarBatch. In data source v2 we add a type parameter
>> to DataReader/WriterFactoty and DataReader/Writer, so that data source
>> supporting columnar scan returns ColumnarBatch at API level.
>>
>> However, we met some problems when migrating streaming and file-based
>> data source to data source v2.
>>
>> For the streaming side, we need a variant of DataReader/WriterFactory to
>> add streaming specific concept like epoch id and offset. For details please
>> see ContinuousDataReaderFactory and https://docs.google.com/do
>> cument/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>>
>> But this conflicts with the special format mixin traits like
>> SupportsScanColumnarBatch. We have to make the streaming variant of
>> DataReader/WriterFactory to extend the original DataReader/WriterFactory,
>> and do type cast at runtime, which is unnecessary and violate the type
>> safety.
>>
>> For the file-based data source side, we have a problem with code
>> duplication. Let's take ORC data source as an example. To support both
>> unsafe row and columnar batch scan, we need something like
>>
>> // A lot of parameters to carry to the executor side
>> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>>   def createDataReader ...
>> }
>>
>> class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch]
>> {
>>   def createDataReader ...
>> }
>>
>> class OrcDataSourceReader extends DataSourceReader {
>>   def createUnsafeRowFactories = ... // logic to prepare the parameters
>> and create factories
>>
>>   def createColumnarBatchFactories = ... // logic to prepare the
>> parameters and create factories
>> }
>>
>> You can see that we have duplicated logic for preparing parameters and
>> defining the factory.
>>
>> Here I propose to remove all the special format mixin traits and change
>> the factory interface to
>>
>> public enum DataFormat {
>>   ROW,
>>   INTERNAL_ROW,
>>   UNSAFE_ROW,
>>   COLUMNAR_BATCH
>> }
>>
>> interface DataReaderFactory {
>>   DataFormat dataFormat;
>>
>>   default DataReader<Row> createRowDataReader() {
>>     throw new IllegalStateException();
>>   }
>>
>>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>>     throw new IllegalStateException();
>>   }
>>
>>   default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
>>     throw new IllegalStateException();
>>   }
>> }
>>
>> Spark will look at the dataFormat and decide which create data reader
>> method to call.
>>
>> Now we don't have the problem for the streaming side as these special
>> format mixin traits go away. And the ORC data source can also be simplified
>> to
>>
>> class OrcReaderFactory(...) extends DataReaderFactory {
>>   def createUnsafeRowReader ...
>>
>>   def createColumnarBatchReader ...
>> }
>>
>> class OrcDataSourceReader extends DataSourceReader {
>>   def createReadFactories = ... // logic to prepare the parameters and
>> create factories
>> }
>>
>> We also have a potential benefit of supporting hybrid storage data
>> source, which may keep real-time data in row format, and history data in
>> columnar format. Then they can make some DataReaderFactory output
>> InternalRow and some output ColumnarBatch.
>>
>> Thoughts?
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Wenchen Fan <cl...@gmail.com>.
Yea definitely not. The only requirement is, the DataReader/WriterFactory
must support at least one DataFormat.

>  how are we going to express capability of the given reader of its
supported format(s), or specific support for each of “real-time data in row
format, and history data in columnar format”?

When DataSourceReader/Writer create factories, the factory must contain
enough information to decide the data format. Let's take ORC as an example.
In OrcReaderFactory, it knows which files to read, and which columns to
output. Since now Spark only support columnar scan for simple types,
OrcReaderFactory will only output ColumnarBatch if the columns to scan are
all simple types.

On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung <fe...@hotmail.com>
wrote:

> Is it required for DataReader to support all known DataFormat?
>
> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
> specifically how are we going to express capability of the given reader of
> its supported format(s), or specific support for each of “real-time data in
> row format, and history data in columnar format”?
>
>
> ------------------------------
> *From:* Wenchen Fan <cl...@gmail.com>
> *Sent:* Sunday, April 15, 2018 7:45:01 PM
> *To:* Spark dev list
> *Subject:* [discuss][data source v2] remove type parameter in
> DataReader/WriterFactory
>
> Hi all,
>
> I'd like to propose an API change to the data source v2.
>
> One design goal of data source v2 is API type safety. The FileFormat API
> is a bad example, it asks the implementation to return InternalRow even
> it's actually ColumnarBatch. In data source v2 we add a type parameter to
> DataReader/WriterFactoty and DataReader/Writer, so that data source
> supporting columnar scan returns ColumnarBatch at API level.
>
> However, we met some problems when migrating streaming and file-based data
> source to data source v2.
>
> For the streaming side, we need a variant of DataReader/WriterFactory to
> add streaming specific concept like epoch id and offset. For details please
> see ContinuousDataReaderFactory and https://docs.google.com/document/d/
> 1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>
> But this conflicts with the special format mixin traits like
> SupportsScanColumnarBatch. We have to make the streaming variant of
> DataReader/WriterFactory to extend the original DataReader/WriterFactory,
> and do type cast at runtime, which is unnecessary and violate the type
> safety.
>
> For the file-based data source side, we have a problem with code
> duplication. Let's take ORC data source as an example. To support both
> unsafe row and columnar batch scan, we need something like
>
> // A lot of parameters to carry to the executor side
> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>   def createDataReader ...
> }
>
> class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch]
> {
>   def createDataReader ...
> }
>
> class OrcDataSourceReader extends DataSourceReader {
>   def createUnsafeRowFactories = ... // logic to prepare the parameters
> and create factories
>
>   def createColumnarBatchFactories = ... // logic to prepare the
> parameters and create factories
> }
>
> You can see that we have duplicated logic for preparing parameters and
> defining the factory.
>
> Here I propose to remove all the special format mixin traits and change
> the factory interface to
>
> public enum DataFormat {
>   ROW,
>   INTERNAL_ROW,
>   UNSAFE_ROW,
>   COLUMNAR_BATCH
> }
>
> interface DataReaderFactory {
>   DataFormat dataFormat;
>
>   default DataReader<Row> createRowDataReader() {
>     throw new IllegalStateException();
>   }
>
>   default DataReader<UnsafeRow> createUnsafeRowDataReader() {
>     throw new IllegalStateException();
>   }
>
>   default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
>     throw new IllegalStateException();
>   }
> }
>
> Spark will look at the dataFormat and decide which create data reader
> method to call.
>
> Now we don't have the problem for the streaming side as these special
> format mixin traits go away. And the ORC data source can also be simplified
> to
>
> class OrcReaderFactory(...) extends DataReaderFactory {
>   def createUnsafeRowReader ...
>
>   def createColumnarBatchReader ...
> }
>
> class OrcDataSourceReader extends DataSourceReader {
>   def createReadFactories = ... // logic to prepare the parameters and
> create factories
> }
>
> We also have a potential benefit of supporting hybrid storage data source,
> which may keep real-time data in row format, and history data in columnar
> format. Then they can make some DataReaderFactory output InternalRow and
> some output ColumnarBatch.
>
> Thoughts?
>

Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Posted by Felix Cheung <fe...@hotmail.com>.
Is it required for DataReader to support all known DataFormat?

Hopefully, not, as assumed by the 'throw' in the interface. Then specifically how are we going to express capability of the given reader of its supported format(s), or specific support for each of "real-time data in row format, and history data in columnar format"?


________________________________
From: Wenchen Fan <cl...@gmail.com>
Sent: Sunday, April 15, 2018 7:45:01 PM
To: Spark dev list
Subject: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

Hi all,

I'd like to propose an API change to the data source v2.

One design goal of data source v2 is API type safety. The FileFormat API is a bad example, it asks the implementation to return InternalRow even it's actually ColumnarBatch. In data source v2 we add a type parameter to DataReader/WriterFactoty and DataReader/Writer, so that data source supporting columnar scan returns ColumnarBatch at API level.

However, we met some problems when migrating streaming and file-based data source to data source v2.

For the streaming side, we need a variant of DataReader/WriterFactory to add streaming specific concept like epoch id and offset. For details please see ContinuousDataReaderFactory and https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#

But this conflicts with the special format mixin traits like SupportsScanColumnarBatch. We have to make the streaming variant of DataReader/WriterFactory to extend the original DataReader/WriterFactory, and do type cast at runtime, which is unnecessary and violate the type safety.

For the file-based data source side, we have a problem with code duplication. Let's take ORC data source as an example. To support both unsafe row and columnar batch scan, we need something like

// A lot of parameters to carry to the executor side
class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
  def createDataReader ...
}

class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch] {
  def createDataReader ...
}

class OrcDataSourceReader extends DataSourceReader {
  def createUnsafeRowFactories = ... // logic to prepare the parameters and create factories

  def createColumnarBatchFactories = ... // logic to prepare the parameters and create factories
}

You can see that we have duplicated logic for preparing parameters and defining the factory.

Here I propose to remove all the special format mixin traits and change the factory interface to

public enum DataFormat {
  ROW,
  INTERNAL_ROW,
  UNSAFE_ROW,
  COLUMNAR_BATCH
}

interface DataReaderFactory {
  DataFormat dataFormat;

  default DataReader<Row> createRowDataReader() {
    throw new IllegalStateException();
  }

  default DataReader<UnsafeRow> createUnsafeRowDataReader() {
    throw new IllegalStateException();
  }

  default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
    throw new IllegalStateException();
  }
}

Spark will look at the dataFormat and decide which create data reader method to call.

Now we don't have the problem for the streaming side as these special format mixin traits go away. And the ORC data source can also be simplified to

class OrcReaderFactory(...) extends DataReaderFactory {
  def createUnsafeRowReader ...

  def createColumnarBatchReader ...
}

class OrcDataSourceReader extends DataSourceReader {
  def createReadFactories = ... // logic to prepare the parameters and create factories
}

We also have a potential benefit of supporting hybrid storage data source, which may keep real-time data in row format, and history data in columnar format. Then they can make some DataReaderFactory output InternalRow and some output ColumnarBatch.

Thoughts?