You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HyukjinKwon (via GitHub)" <gi...@apache.org> on 2023/12/11 23:20:52 UTC

[PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

HyukjinKwon opened a new pull request, #44305:
URL: https://github.com/apache/spark/pull/44305

   ### What changes were proposed in this pull request?
   
   This PR is same as https://github.com/apache/spark/pull/44233 but does not use `V1Table` but the original DSv2 interface by reusing UDTF execution code.
   
   ### Why are the changes needed?
   
   In order for Python Data Source to be able to be used in all other place including SparkR, Scala together.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. Users can register their Python Data Source, and use them in SQL, SparkR, etc.
   
   ### How was this patch tested?
   
   Unittests were added, and manually tested.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427730595


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,199 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    val outputSchema = schema
+    new Table with SupportsRead {
+      override def name(): String = shortName
+
+      override def capabilities(): java.util.Set[TableCapability] = java.util.EnumSet.of(
+        BATCH_READ)
+
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new ScanBuilder with Batch with Scan {
+
+          private lazy val infoInPython: PythonDataSourceReadInfo = {
+            if (dataSourceInPython == null) {
+              dataSourceInPython = source
+                .createDataSourceInPython(shortName, options, Some(outputSchema))
+            }
+            source.createReadInfoInPython(dataSourceInPython, outputSchema)
+          }
+
+          override def build(): Scan = this
+
+          override def toBatch: Batch = this
+
+          override def readSchema(): StructType = outputSchema
+
+          override def planInputPartitions(): Array[InputPartition] =
+            infoInPython.partitions.zipWithIndex.map(p => PythonInputPartition(p._2, p._1)).toArray
+
+          override def createReaderFactory(): PartitionReaderFactory = {
+            val readerFunc = infoInPython.func
+            new PythonPartitionReaderFactory(
+              source, readerFunc, outputSchema, jobArtifactUUID)
+          }
+        }
+      }
+
+      override def schema(): StructType = outputSchema
+    }
+  }
+
+  override def supportsExternalMetadata(): Boolean = true

Review Comment:
   I am actually thinking about whether we should expose this as an API in Python data source.
   If a data source cannot handle external metadata, then `.schema(....)` or `CREATE TABLE table(...)` should fail, instead of failing when executing the query.
   But I am not sure if this will make the Python API too complicated. WDTY?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1428350647


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -780,7 +780,7 @@ class SparkSession private(
     DataSource.lookupDataSource(runner, sessionState.conf) match {
       case source if classOf[ExternalCommandRunner].isAssignableFrom(source) =>
         Dataset.ofRows(self, ExternalCommandExecutor(
-          source.getDeclaredConstructor().newInstance()
+          DataSource.newDataSourceInstance(runner, source)

Review Comment:
   It may be arguable that if this is a breaking change. Now people need to worry about python data source in the code that is to deal with DS v1 only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427567309


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -105,13 +106,14 @@ case class DataSource(
     // [[FileDataSourceV2]] will still be used if we call the load()/save() method in
     // [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
     // instead of `providingClass`.
-    cls.getDeclaredConstructor().newInstance() match {
+    DataSource.newDataSourceInstance(className, cls) match {
       case f: FileDataSourceV2 => f.fallbackFileFormat

Review Comment:
   and `tables.scala` as well:
   
   ```scala
       if (DDLUtils.isDatasourceTable(catalogTable)) {
         DataSource.newDataSourceInstance(
             catalogTable.provider.get,
             DataSource.lookupDataSource(catalogTable.provider.get, conf)) match {
           // For datasource table, this command can only support the following File format.
           // TextFileFormat only default to one column "value"
           // Hive type is already considered as hive serde table, so the logic will not
           // come in here.
           case _: CSVFileFormat | _: JsonFileFormat | _: ParquetFileFormat =>
           case _: JsonDataSourceV2 | _: CSVDataSourceV2 |
                _: OrcDataSourceV2 | _: ParquetDataSourceV2 =>
           case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
           case s =>
             throw QueryCompilationErrors.alterAddColNotSupportDatasourceTableError(s, table)
         }
       }
       catalogTable
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1425859733


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala:
##########
@@ -231,8 +212,9 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession {
     val schema = StructType.fromDDL("id INT, partition INT")
     val dataSource = createUserDefinedPythonDataSource(
       name = dataSourceName, pythonScript = dataSourceScript)
+    spark.dataSource.registerPython(dataSourceName, dataSource)

Review Comment:
   why do we need the extra registration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1425856795


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -708,17 +732,18 @@ object DataSource extends Logging {
   def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {

Review Comment:
   yeah I like that idea. Can I do it in a followup though? I would like to extract some changes from your PR, and make another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427738441


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,199 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    val outputSchema = schema
+    new Table with SupportsRead {
+      override def name(): String = shortName
+
+      override def capabilities(): java.util.Set[TableCapability] = java.util.EnumSet.of(
+        BATCH_READ)
+
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new ScanBuilder with Batch with Scan {
+
+          private lazy val infoInPython: PythonDataSourceReadInfo = {
+            if (dataSourceInPython == null) {
+              dataSourceInPython = source
+                .createDataSourceInPython(shortName, options, Some(outputSchema))
+            }
+            source.createReadInfoInPython(dataSourceInPython, outputSchema)
+          }
+
+          override def build(): Scan = this
+
+          override def toBatch: Batch = this
+
+          override def readSchema(): StructType = outputSchema
+
+          override def planInputPartitions(): Array[InputPartition] =
+            infoInPython.partitions.zipWithIndex.map(p => PythonInputPartition(p._2, p._1)).toArray
+
+          override def createReaderFactory(): PartitionReaderFactory = {
+            val readerFunc = infoInPython.func
+            new PythonPartitionReaderFactory(
+              source, readerFunc, outputSchema, jobArtifactUUID)
+          }
+        }
+      }
+
+      override def schema(): StructType = outputSchema
+    }
+  }
+
+  override def supportsExternalMetadata(): Boolean = true

Review Comment:
   I think we can set this to false (default) for now to disallow specifying schema. It's much easier to implement the data source without considering the user-specify schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1425865126


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala:
##########
@@ -231,8 +212,9 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession {
     val schema = StructType.fromDDL("id INT, partition INT")
     val dataSource = createUserDefinedPythonDataSource(
       name = dataSourceName, pythonScript = dataSourceScript)
+    spark.dataSource.registerPython(dataSourceName, dataSource)

Review Comment:
   Previously `UserDefinedPythonDataSource` was able to create a `DataFrame` directly (from `LogicalRelation`) via `UserDefinedPythonDataSource.apply`.
   
   Now it is not possible anymore because we're using DSv2. So, now here we register and load via using `DataFrameReader` to create a `DataFrame` to test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1425868807


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala:
##########
@@ -145,34 +148,6 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession {
     val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript)
     spark.dataSource.registerPython(dataSourceName, dataSource)
     assert(spark.sessionState.dataSourceManager.dataSourceExists(dataSourceName))
-    val ds1 = spark.sessionState.dataSourceManager.lookupDataSource(dataSourceName)
-    checkAnswer(
-      ds1(spark, dataSourceName, None, CaseInsensitiveMap(Map.empty)),
-      Seq(Row(0, 0), Row(0, 1), Row(1, 0), Row(1, 1), Row(2, 0), Row(2, 1)))
-
-    // Should be able to override an already registered data source.

Review Comment:
   oops, I think I should recover this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1426971568


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,200 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    assert(partitioning.isEmpty)
+    val outputSchema = schema
+    new Table with SupportsRead {

Review Comment:
   Actually I intentionally put it together because we should cache `dataSourceInPython` executed from the Python worker (that contains both schema and pickled datasource), once for schema inference, and once for actual reading. So it becomes more readable, and localize the scope of the cache. In addition, I think we won't likely extend this Python Table class/instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427565917


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -780,7 +780,7 @@ class SparkSession private(
     DataSource.lookupDataSource(runner, sessionState.conf) match {

Review Comment:
   Actually @cloud-fan that would not work .. E.g., if PythonDataSource implements `ExternalCommandRunner`, we should load it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1423230523


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -55,84 +55,102 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
     val inputRDD = child.execute().map(_.copy())
 
     inputRDD.mapPartitions { iter =>
-      val context = TaskContext.get()
-
-      // The queue used to buffer input rows so we can drain it to
-      // combine input with output from Python.
-      val queue = HybridRowQueue(context.taskMemoryManager(),
-        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
-      context.addTaskCompletionListener[Unit] { ctx =>
-        queue.close()
-      }
+      EvalPythonUDTFExec.execute(
+        iter, evaluate, udtf, output.length, child.output, requiredChildOutput, output)
+    }
+  }
+}
 
-      // flatten all the arguments
-      val allInputs = new ArrayBuffer[Expression]
-      val dataTypes = new ArrayBuffer[DataType]
-      val argMetas = udtf.children.map { e =>
-        val (key, value) = e match {
-          case NamedArgumentExpression(key, value) =>
-            (Some(key), value)
-          case _ =>
-            (None, e)
-        }
-        if (allInputs.exists(_.semanticEquals(value))) {
-          ArgumentMetadata(allInputs.indexWhere(_.semanticEquals(value)), key)
-        } else {
-          allInputs += value
-          dataTypes += value.dataType
-          ArgumentMetadata(allInputs.length - 1, key)
-        }
-      }.toArray
-      val projection = MutableProjection.create(allInputs.toSeq, child.output)
-      projection.initialize(context.partitionId())
-      val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
-        StructField(s"_$i", dt)
-      }.toArray)
-
-      // Add rows to the queue to join later with the result.
-      // Also keep track of the number rows added to the queue.
-      // This is needed to process extra output rows from the `terminate()` call of the UDTF.
-      var count = 0L
-      val projectedRowIter = iter.map { inputRow =>
-        queue.add(inputRow.asInstanceOf[UnsafeRow])
-        count += 1
-        projection(inputRow)
+object EvalPythonUDTFExec {

Review Comment:
   Just moving the code around. It's same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1426971568


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,200 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    assert(partitioning.isEmpty)
+    val outputSchema = schema
+    new Table with SupportsRead {

Review Comment:
   Actually I intentionally put it together because we should cache `dataSourceInPython` executed from the Python worker (that contains both schema and pickled datasource), once for schema inference, and once for partition reading. So it becomes more readable, and localize the scope of the cache. In addition, I think we won't likely extend this Python Table class/instance.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,200 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    assert(partitioning.isEmpty)
+    val outputSchema = schema
+    new Table with SupportsRead {

Review Comment:
   Actually I intentionally put it together because we should cache `dataSourceInPython` executed from the Python worker (that contains both schema and pickled datasource), once for schema inference, and once for getting partitions. So it becomes more readable, and localize the scope of the cache. In addition, I think we won't likely extend this Python Table class/instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427752943


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,199 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    val outputSchema = schema
+    new Table with SupportsRead {
+      override def name(): String = shortName
+
+      override def capabilities(): java.util.Set[TableCapability] = java.util.EnumSet.of(
+        BATCH_READ)
+
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new ScanBuilder with Batch with Scan {
+
+          private lazy val infoInPython: PythonDataSourceReadInfo = {
+            if (dataSourceInPython == null) {
+              dataSourceInPython = source
+                .createDataSourceInPython(shortName, options, Some(outputSchema))
+            }
+            source.createReadInfoInPython(dataSourceInPython, outputSchema)
+          }
+
+          override def build(): Scan = this
+
+          override def toBatch: Batch = this
+
+          override def readSchema(): StructType = outputSchema
+
+          override def planInputPartitions(): Array[InputPartition] =
+            infoInPython.partitions.zipWithIndex.map(p => PythonInputPartition(p._2, p._1)).toArray
+
+          override def createReaderFactory(): PartitionReaderFactory = {
+            val readerFunc = infoInPython.func
+            new PythonPartitionReaderFactory(
+              source, readerFunc, outputSchema, jobArtifactUUID)
+          }
+        }
+      }
+
+      override def schema(): StructType = outputSchema
+    }
+  }
+
+  override def supportsExternalMetadata(): Boolean = true

Review Comment:
   For simplicity, I think we can set it to false (default value) for now. It can be difficult to implement a data source that supports user-specified schema actually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1428616348


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala:
##########
@@ -36,7 +36,7 @@ class MapInBatchEvaluatorFactory(
     sessionLocalTimeZone: String,
     largeVarTypes: Boolean,
     pythonRunnerConf: Map[String, String],
-    pythonMetrics: Map[String, SQLMetric],
+    pythonMetrics: Option[Map[String, SQLMetric]],

Review Comment:
   Here: https://github.com/apache/spark/pull/44375



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427560724


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -708,17 +732,18 @@ object DataSource extends Logging {
   def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {

Review Comment:
   oh okie dokie. I was actually thinking about porting more changes in your PR. I will fix that one alone here for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1428348881


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -780,7 +780,7 @@ class SparkSession private(
     DataSource.lookupDataSource(runner, sessionState.conf) match {

Review Comment:
   Let's worry about it when we actually adding this ability to the python data source. We may never add it for simplicity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #44305: [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec)
URL: https://github.com/apache/spark/pull/44305


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427738441


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,199 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    val outputSchema = schema
+    new Table with SupportsRead {
+      override def name(): String = shortName
+
+      override def capabilities(): java.util.Set[TableCapability] = java.util.EnumSet.of(
+        BATCH_READ)
+
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new ScanBuilder with Batch with Scan {
+
+          private lazy val infoInPython: PythonDataSourceReadInfo = {
+            if (dataSourceInPython == null) {
+              dataSourceInPython = source
+                .createDataSourceInPython(shortName, options, Some(outputSchema))
+            }
+            source.createReadInfoInPython(dataSourceInPython, outputSchema)
+          }
+
+          override def build(): Scan = this
+
+          override def toBatch: Batch = this
+
+          override def readSchema(): StructType = outputSchema
+
+          override def planInputPartitions(): Array[InputPartition] =
+            infoInPython.partitions.zipWithIndex.map(p => PythonInputPartition(p._2, p._1)).toArray
+
+          override def createReaderFactory(): PartitionReaderFactory = {
+            val readerFunc = infoInPython.func
+            new PythonPartitionReaderFactory(
+              source, readerFunc, outputSchema, jobArtifactUUID)
+          }
+        }
+      }
+
+      override def schema(): StructType = outputSchema
+    }
+  }
+
+  override def supportsExternalMetadata(): Boolean = true

Review Comment:
   I think we can set this to false (default) for now to disallow specifying schema. It's much easier to implement the data source without considering the user-specify schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1425842196


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -708,17 +732,18 @@ object DataSource extends Logging {
   def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {

Review Comment:
   how do you think of my idea to only put python data source handing in this method? https://github.com/apache/spark/pull/44269/files#diff-2a3ed194aac77f3de25418a74a756d8d821feb2b3d38f4fec144f312e022801aR709



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427566941


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -780,7 +780,7 @@ class SparkSession private(
     DataSource.lookupDataSource(runner, sessionState.conf) match {

Review Comment:
   lemme fix it separately. Reading the code path, I think it won't more and less affect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1428368292


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -780,7 +780,7 @@ class SparkSession private(
     DataSource.lookupDataSource(runner, sessionState.conf) match {
       case source if classOf[ExternalCommandRunner].isAssignableFrom(source) =>
         Dataset.ofRows(self, ExternalCommandExecutor(
-          source.getDeclaredConstructor().newInstance()
+          DataSource.newDataSourceInstance(runner, source)

Review Comment:
   `ExternalCommandRunner` is DSv2 API..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #44305:
URL: https://github.com/apache/spark/pull/44305#issuecomment-1858359128

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427566069


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -105,13 +106,14 @@ case class DataSource(
     // [[FileDataSourceV2]] will still be used if we call the load()/save() method in
     // [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
     // instead of `providingClass`.
-    cls.getDeclaredConstructor().newInstance() match {
+    DataSource.newDataSourceInstance(className, cls) match {
       case f: FileDataSourceV2 => f.fallbackFileFormat

Review Comment:
   and here too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427568443


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -105,13 +106,14 @@ case class DataSource(
     // [[FileDataSourceV2]] will still be used if we call the load()/save() method in
     // [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
     // instead of `providingClass`.
-    cls.getDeclaredConstructor().newInstance() match {
+    DataSource.newDataSourceInstance(className, cls) match {
       case f: FileDataSourceV2 => f.fallbackFileFormat

Review Comment:
   and DataStreamWriter:
   
   ```scala
         val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
         val disabledSources =
           Utils.stringToSeq(df.sparkSession.sessionState.conf.disabledV2StreamingWriters)
         val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
           // file source v2 does not support streaming yet.
           classOf[FileDataSourceV2].isAssignableFrom(cls)
   
         val optionsWithPath = if (path.isEmpty) {
           extraOptions
         } else {
           extraOptions + ("path" -> path.get)
         }
   
         val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427568014


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -105,13 +106,14 @@ case class DataSource(
     // [[FileDataSourceV2]] will still be used if we call the load()/save() method in
     // [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
     // instead of `providingClass`.
-    cls.getDeclaredConstructor().newInstance() match {
+    DataSource.newDataSourceInstance(className, cls) match {
       case f: FileDataSourceV2 => f.fallbackFileFormat

Review Comment:
   and DataStreamReader:
   
   ```scala
       val v1DataSource = DataSource(
         sparkSession,
         userSpecifiedSchema = userSpecifiedSchema,
         className = source,
         options = optionsWithPath.originalMap)
       val v1Relation = ds match {
         case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
         case _ => None
       }
       ds match {
         // file source v2 does not support streaming yet.
         case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1427550280


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:
##########
@@ -708,17 +732,18 @@ object DataSource extends Logging {
   def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {

Review Comment:
   It's not a followup... I have a concern about changing `lookupDataSource` which is only used for the DS v1 path. Let's avoid the risk of breaking anything. It's also less code change if we only instantiate the `PythonTableProvider` here, so that the existing caller of `lookupDataSource` can still instantiate the objects directly instead of calling the new `newDataSourceInstance` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1425859297


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala:
##########
@@ -145,34 +148,6 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession {
     val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript)
     spark.dataSource.registerPython(dataSourceName, dataSource)
     assert(spark.sessionState.dataSourceManager.dataSourceExists(dataSourceName))
-    val ds1 = spark.sessionState.dataSourceManager.lookupDataSource(dataSourceName)
-    checkAnswer(
-      ds1(spark, dataSourceName, None, CaseInsensitiveMap(Map.empty)),
-      Seq(Row(0, 0), Row(0, 1), Row(1, 0), Row(1, 1), Row(2, 0), Row(2, 1)))
-
-    // Should be able to override an already registered data source.

Review Comment:
   This test is broken now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1426984033


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala:
##########
@@ -36,7 +36,7 @@ class MapInBatchEvaluatorFactory(
     sessionLocalTimeZone: String,
     largeVarTypes: Boolean,
     pythonRunnerConf: Map[String, String],
-    pythonMetrics: Map[String, SQLMetric],
+    pythonMetrics: Option[Map[String, SQLMetric]],

Review Comment:
   In order to reuse `MapInBatchEvaluatorFactory` to read the data in executor side. We should integrate this to `Scan.reportDriverMetrics` though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1426808206


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,200 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    assert(partitioning.isEmpty)

Review Comment:
   Do we need this assertion? Or shall we throw an exception when partitioning is not empty?
   What if a user creates a create table with partitioning:
   ```
   CREATE TABLE test (i INT, j INT) USING my-python-data-source
   PARTITIONED BY (i)
   ```
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,200 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    assert(partitioning.isEmpty)
+    val outputSchema = schema
+    new Table with SupportsRead {
+      override def name(): String = shortName
+
+      override def capabilities(): java.util.Set[TableCapability] = java.util.EnumSet.of(
+        BATCH_READ, BATCH_WRITE)

Review Comment:
   BATCH_WRITE is currently not supported



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala:
##########
@@ -36,7 +36,7 @@ class MapInBatchEvaluatorFactory(
     sessionLocalTimeZone: String,
     largeVarTypes: Boolean,
     pythonRunnerConf: Map[String, String],
-    pythonMetrics: Map[String, SQLMetric],
+    pythonMetrics: Option[Map[String, SQLMetric]],

Review Comment:
   Why do we need to change this to Optional?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -20,58 +20,200 @@ package org.apache.spark.sql.execution.python
 import java.io.{DataInputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonDataSource}
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var dataSourceInPython: PythonDataSourceCreationResult = _
+  private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  private lazy val source: UserDefinedPythonDataSource =
+    SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    if (dataSourceInPython == null) {
+      dataSourceInPython = source.createDataSourceInPython(shortName, options, None)
+    }
+    dataSourceInPython.schema
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    assert(partitioning.isEmpty)
+    val outputSchema = schema
+    new Table with SupportsRead {

Review Comment:
   We can create a new class `PythonTable` to make it more extensible in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (DSv2 exec) [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1426984033


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala:
##########
@@ -36,7 +36,7 @@ class MapInBatchEvaluatorFactory(
     sessionLocalTimeZone: String,
     largeVarTypes: Boolean,
     pythonRunnerConf: Map[String, String],
-    pythonMetrics: Map[String, SQLMetric],
+    pythonMetrics: Option[Map[String, SQLMetric]],

Review Comment:
   In order to reuse `MapInBatchEvaluatorFactory` to read the data in executor side. We should integrate this to `Scan.supportedCustomMetrics` though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org