You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rafael (Jira)" <ji...@apache.org> on 2020/08/14 20:43:00 UTC
[jira] [Commented] (SPARK-25390) Data source V2 API refactoring
[ https://issues.apache.org/jira/browse/SPARK-25390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178052#comment-17178052 ]
Rafael commented on SPARK-25390:
--------------------------------
Hey guys,
I'm trying to migrate the package where I was using *import org.apache.spark.sql.sources.v2._ into Spark3.0.0*
and haven't found a good guide so may I ask my questions here.
Here my migration plan can you highlight what interfaces should I use now
1. Cannot find what should I use instead of ReadSupport, ReadSupport, DataSourceReader,
if instead of ReadSupport we have to use now Scan then what happened to method createReader?
{code:java}
class DefaultSource extends ReadSupport {
override def createReader(options: DataSourceOptions): DataSourceReader = new GeneratingReader()
}
{code}
2.
Here instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsReportPartitioning}
{code}
I should use
{code:java}
import org.apache.spark.sql.connector.read.partitioning.{Distribution, Partitioning}
import org.apache.spark.sql.connector.read.{InputPartition, SupportsReportPartitioning}
{code}
right?
{code:java}
class GeneratingReader() extends DataSourceReader {
override def readSchema(): StructType = {...}
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
val partitions = new util.ArrayList[InputPartition[InternalRow]]()
...
partitions.add(new GeneratingInputPartition(...))
}
override def outputPartitioning(): Partitioning = {...}
}
{code}
3.
Haven't found what should I use instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code}
Interface like it has totally different contract
{code:java}
import org.apache.spark.sql.connector.read.InputPartition{code}
{code:java}
class GeneratingInputPartition() extends InputPartition[InternalRow] {
override def createPartitionReader(): InputPartitionReader[InternalRow] = new GeneratingInputPartitionReader(...)
}
class GeneratingInputPartitionReader() extends InputPartitionReader[InternalRow] {
override def next(): Boolean = ...
override def get(): InternalRow = ...
override def close(): Unit = ...
}
{code}
> Data source V2 API refactoring
> ------------------------------
>
> Key: SPARK-25390
> URL: https://issues.apache.org/jira/browse/SPARK-25390
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Wenchen Fan
> Assignee: Wenchen Fan
> Priority: Major
> Fix For: 3.0.0
>
>
> Currently it's not very clear how we should abstract data source v2 API. The abstraction should be unified between batch and streaming, or similar but have a well-defined difference between batch and streaming. And the abstraction should also include catalog/table.
> An example of the abstraction:
> {code}
> batch: catalog -> table -> scan
> streaming: catalog -> table -> stream -> scan
> {code}
> We should refactor the data source v2 API according to the abstraction
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org