You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rafael (Jira)" <ji...@apache.org> on 2020/08/14 20:43:00 UTC

[jira] [Commented] (SPARK-25390) Data source V2 API refactoring

    [ https://issues.apache.org/jira/browse/SPARK-25390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178052#comment-17178052 ] 

Rafael commented on SPARK-25390:
--------------------------------

Hey guys, 

I'm trying to migrate the package where I was using *import org.apache.spark.sql.sources.v2._ into Spark3.0.0*

and haven't found a good guide so may I ask my questions here.

 

Here my migration plan can you highlight what interfaces should I use now

 

1. Cannot find what should I use instead of ReadSupport, ReadSupport, DataSourceReader,

if instead of ReadSupport we have to use now Scan then what happened to method createReader? 
{code:java}

class DefaultSource extends ReadSupport { 
  override def createReader(options: DataSourceOptions): DataSourceReader = new GeneratingReader() 
}
{code}
2. 

Here instead of

 
{code:java}
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsReportPartitioning}
{code}
 

I should use

 
{code:java}
import org.apache.spark.sql.connector.read.partitioning.{Distribution, Partitioning}
import org.apache.spark.sql.connector.read.{InputPartition, SupportsReportPartitioning}
{code}
 

right?
{code:java}
class GeneratingReader() extends DataSourceReader {
  override def readSchema(): StructType = {...}
  override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
    val partitions = new util.ArrayList[InputPartition[InternalRow]]()
    ...
    partitions.add(new GeneratingInputPartition(...))
  }
  override def outputPartitioning(): Partitioning = {...}
}
{code}
3.

Haven't found what should I use instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code}
Interface like it has totally different contract
{code:java}
import org.apache.spark.sql.connector.read.InputPartition{code}
{code:java}
class GeneratingInputPartition() extends InputPartition[InternalRow] {
  override def createPartitionReader(): InputPartitionReader[InternalRow] = new  GeneratingInputPartitionReader(...)
}

class GeneratingInputPartitionReader() extends InputPartitionReader[InternalRow] {
   override def next(): Boolean = ...
   override def get(): InternalRow = ...
   override def close(): Unit = ...
}
{code}

> Data source V2 API refactoring
> ------------------------------
>
>                 Key: SPARK-25390
>                 URL: https://issues.apache.org/jira/browse/SPARK-25390
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Wenchen Fan
>            Assignee: Wenchen Fan
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Currently it's not very clear how we should abstract data source v2 API. The abstraction should be unified between batch and streaming, or similar but have a well-defined difference between batch and streaming. And the abstraction should also include catalog/table.
> An example of the abstraction:
> {code}
> batch: catalog -> table -> scan
> streaming: catalog -> table -> stream -> scan
> {code}
> We should refactor the data source v2 API according to the abstraction



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org