You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HyukjinKwon (via GitHub)" <gi...@apache.org> on 2023/12/06 08:11:51 UTC

Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL [spark]

HyukjinKwon commented on code in PR #43784:
URL: https://github.com/apache/spark/pull/43784#discussion_r1416880108


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala:
##########
@@ -81,3 +100,125 @@ class DataSourceManager extends Logging {
     manager
   }
 }
+
+/**
+ * Data Source V2 default source wrapper for Python Data Source.
+ */
+abstract class PythonDefaultSource
+    extends TableProvider
+    with DataSourceRegister {
+
+  private var sourceDataFrame: DataFrame = _
+
+  private def getOrCreateSourceDataFrame(
+      options: CaseInsensitiveStringMap, maybeSchema: Option[StructType]): DataFrame = {
+    if (sourceDataFrame != null) return sourceDataFrame
+    // TODO(SPARK-45600): should be session-based.
+    val builder = SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName())
+    val plan = builder(
+      SparkSession.active,
+      shortName(),
+      maybeSchema,
+      CaseInsensitiveMap(options.asCaseSensitiveMap().asScala.toMap))
+    sourceDataFrame = Dataset.ofRows(SparkSession.active, plan)
+    sourceDataFrame
+  }
+
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType =
+    getOrCreateSourceDataFrame(options, None).schema
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    val givenSchema = schema
+    new Table with SupportsRead {
+      override def name(): String = shortName()
+
+      override def capabilities(): java.util.Set[TableCapability] = java.util.EnumSet.of(BATCH_READ)
+
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new ScanBuilder with V1Scan {

Review Comment:
   @cloud-fan and @allisonwang-db, Here yet I use `V1Scan` interface.
   
   In order to fully leverage DSv2, we should actually refactor the whole `PlanPythonDataSourceScan` and `UserDefinedPythonDataSource`.
   1. First we should remove `PlanPythonDataSourceScan` rule so `DataSourceV2Strategy` can resolve the DSv2.
   2. Second, we should fix/port the partitioning/reading logics from `UserDefinedPythonDataSource` to this `Scan` and `ScanBuilder` implementation.
   
   While I don't think this is a problem now, but we should do it in the end for write path, etc I believe (?). I would like it to be done separately if you don't mind (and I would like to focus on static/runtime registration part).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org