You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "allisonwang-db (via GitHub)" <gi...@apache.org> on 2023/12/07 18:59:18 UTC

Re: [PR] [SPARK-45597][PYTHON][SQL] Support creating table using a Python data source in SQL (single wrapper) [spark]

allisonwang-db commented on code in PR #44233:
URL: https://github.com/apache/spark/pull/44233#discussion_r1419468729


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala:
##########
@@ -81,3 +93,60 @@ class DataSourceManager extends Logging {
     manager
   }
 }
+
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var sourceDataFrame: DataFrame = _
+
+  private def getOrCreateSourceDataFrame(
+      options: CaseInsensitiveStringMap, maybeSchema: Option[StructType]): DataFrame = {
+    if (sourceDataFrame != null) return sourceDataFrame
+    // TODO(SPARK-45600): should be session-based.
+    val builder = SparkSession.active.sessionState.dataSourceManager.lookupDataSource(shortName)
+    val plan = builder(
+      SparkSession.active,

Review Comment:
   Does it get the correct session for spark connect?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala:
##########
@@ -81,3 +93,60 @@ class DataSourceManager extends Logging {
     manager
   }
 }
+
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {
+  private var sourceDataFrame: DataFrame = _
+
+  private def getOrCreateSourceDataFrame(
+      options: CaseInsensitiveStringMap, maybeSchema: Option[StructType]): DataFrame = {
+    if (sourceDataFrame != null) return sourceDataFrame
+    // TODO(SPARK-45600): should be session-based.

Review Comment:
   This one should be fixed?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala:
##########
@@ -81,3 +93,60 @@ class DataSourceManager extends Logging {
     manager
   }
 }
+
+/**
+ * Data Source V2 wrapper for Python Data Source.
+ */
+class PythonTableProvider(shortName: String) extends TableProvider {

Review Comment:
   Should we support external metadata for this data source? I.e users can create a table using a python datasource with user defined table schema. 



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -156,8 +156,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
       extraOptions + ("path" -> path.get)
     }
 
-    val ds = DataSource.lookupDataSource(source, sparkSession.sessionState.conf).
-      getConstructor().newInstance()
+    val ds = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) match {
+      case cls if classOf[PythonTableProvider].isAssignableFrom(cls) =>
+        cls.getConstructor(classOf[String]).newInstance(source)
+      case cls => cls.getDeclaredConstructor().newInstance()
+    }

Review Comment:
   Shall we make this a helper function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org