You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "allisonwang-db (via GitHub)" <gi...@apache.org> on 2023/10/13 03:18:16 UTC

[PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

allisonwang-db opened a new pull request, #43360:
URL: https://github.com/apache/spark/pull/43360

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR adds a new Python data source API and adds initial support for data source read. 
   
   Here is an overview of the Python data source API. Please see the code (`python/pyspark/sql/datasource.py`) for more details.
   ```python
   class DataSource:
       def __init__(self, options: Dict):
           self.options = options
   
       @property
       def name(self) -> str:
           ...
   
       def schema(self) -> Union[StructType, str]:
           ...
   
       def reader(self, schema: StructType) -> "DataSourceReader":
           ...
   
       def writer(self, schema: StructType, saveMode: str) -> "DataSourceWriter":
           ...
   ```
   DataSourceReader (this PR):
   ```python
   class DataSourceReader(ABC):
       @abstractmethod
       def read(self, partition) -> Iterator:
           ...
   
       def partitions(self) -> Iterator[Any]:
           ...
   ```
   DataSourceWriter will be supported in a subsequent PR.
   
   Users can extend this class to create their own Python data source reader (and writer) and use them in Spark:
   ```python
   class MyReader(DataSourceReader):
       def read(self, partition):
           yield (0, 1)
   
   class MyDataSource(DataSource):
       def schema(self):
           return "id INT, value INT"
   
       def reader(self, schema):
           return MyReader()
   
   df = spark.read.format(MyDataSource).load()
   df.show()
   +---+-----+
   | id|value|
   +---+-----+
   |  0|    1|
   +---+-----+
   ```
   
   Note this is the initial PR to support Python data source. It does not currently support:
   1. Load with non-empty values like: `load(path)`
   2. Use it together with `.schema()`
   3. Use it with Spark Connect
   4. Register a Python data source and use its name to reference it in `.format()`
   
   These will be covered in subsequent PRs to make the review easier.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     5. If you fix a bug, you can clarify why it is a bug.
   -->
   To make Spark more accessible to Python users. For more information, please see this [SPIP](https://docs.google.com/document/d/1oYrCKEKHzznljYfJO4kx5K_Npcgt1Slyfph3NEk7JRU/edit?usp=sharing).
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes. Users can use the new Python data source API to create their own data source.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   New UTs.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364352410


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.

Review Comment:
   This is nice but currently not doable :) I've created a new ticket here to keep track: [SPARK-45597](https://issues.apache.org/jira/browse/SPARK-45597)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361303331


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +329,50 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):

Review Comment:
   Ah, then, how about using `issubclass(self._format, DataSource)`?
   - https://docs.python.org/3/library/functions.html#issubclass



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361275795


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +329,50 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):

Review Comment:
   Ah, but `str` is also an instance of `Type`...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1358824516


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+
+import net.razorvine.pickle.Pickler
+
+import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.PythonDataSource
+import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A user-defined Python data source. This is used by the Python API.
+ */
+case class UserDefinedPythonDataSource(
+    dataSource: PythonFunction,
+    schema: StructType) {
+  def apply(session: SparkSession): DataFrame = {
+    val source = PythonDataSource(dataSource, schema, output = toAttributes(schema))
+    Dataset.ofRows(session, source)
+  }
+}
+
+case class PythonDataSourceReadInfo(
+    func: Array[Byte],
+    partitions: Seq[Array[Byte]])
+
+class UserDefinedPythonDataSourceReadRunner(
+    func: PythonFunction,
+    schema: StructType) extends PythonPlannerRunner[PythonDataSourceReadInfo](func) {
+
+  override val workerModule = "pyspark.sql.worker.plan_data_source_read"
+
+  override protected def writeToPython(dataOut: DataOutputStream, pickler: Pickler): Unit = {
+    // Send Python data source
+    PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+    // Send schema
+    PythonWorkerUtils.writeUTF(schema.json, dataOut)
+  }
+
+  override protected def receiveFromPython(dataIn: DataInputStream): PythonDataSourceReadInfo = {
+    // Receive the picked reader or an exception raised in Python worker.
+    val length = dataIn.readInt()
+    if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)
+    }
+
+    // Receive the pickled 'read' function.
+    val pickledFunction: Array[Byte] = {
+      val obj = new Array[Byte](length)
+      dataIn.readFully(obj)
+      obj
+    }

Review Comment:
   ```suggestion
       val pickledFunction = PythonWorkerUtils.readBytes(length, dataIn)
   ```



##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +329,50 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):
+            # TODO: support path in options.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()
+            if isinstance(schema, str):
+                schema = _parse_datatype_string(schema)
+                # Check if the schema is a valid StructType.
+                if not isinstance(schema, StructType):
+                    raise PySparkTypeError(
+                        error_class="NOT_STR_OR_STRUCT",
+                        message_parameters={
+                            "arg_name": "schema",
+                            "arg_type": type(schema).__name__,
+                        },
+                    )
+
+            jschema = self._spark._jsparkSession.parseDataType(schema.json())
+            sc = self._spark._sc
+            pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(
+                sc, data_source
+            )
+            assert sc._jvm is not None
+            func = sc._jvm.SimplePythonFunction(
+                bytearray(pickled_command),
+                env,
+                includes,
+                sc.pythonExec,
+                sc.pythonVer,
+                broadcast_vars,
+                sc._javaAccumulator,
+            )

Review Comment:
   Can't we use `_wrap_function` in `udf.py`?



##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +329,50 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):

Review Comment:
   Can't this be `DataSource` instead of `Callable`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonDataSourcePartitionExec.scala:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{InputRDDCodegen, LeafExecNode, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+case class PythonDataSourcePartitionExec(
+    output: Seq[Attribute],
+    partitions: Seq[Array[Byte]])
+    extends LeafExecNode with InputRDDCodegen with PythonSQLMetrics {

Review Comment:
   nit: style; 2 white spaces for `extends`.



##########
python/pyspark/sql/tests/test_python_datasource.py:
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.sql.types import (
+    IntegerType,
+    Row,
+    StructField,
+    StructType,
+)
+from pyspark.testing import assertDataFrameEqual
+from pyspark.sql.datasource import DataSource, DataSourceReader
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+
+
+class TestDataSourceReader(DataSourceReader):
+    def read(self, partition):
+        yield (partition, 0)
+        yield (partition, 1)
+
+
+class DataSourcePartitionReader(TestDataSourceReader):
+    def partitions(self):
+        return [1, 2]
+
+
+class TestDataSource(DataSource):
+    def schema(self):
+        return StructType([StructField("id", IntegerType()), StructField("value", IntegerType())])
+
+    def reader(self, schema):
+        return TestDataSourceReader()
+
+
+class TestPartitionedDataSource(TestDataSource):
+    def reader(self, schema):
+        return DataSourcePartitionReader()
+
+
+class BasePythonDataSourceTestsMixin:
+    def test_data_source_read(self):
+        df = self.spark.read.format(TestDataSource).load()
+        assertDataFrameEqual(df, [Row(id=None, value=0), Row(id=None, value=1)])
+
+    # TODO: support read with schema

Review Comment:
   Shall we have a JIRA ID for TODO?
   ```py
   # TODO(SPARK-xxx): ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362485135


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PythonDataSourceStrategy.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.api.python.{PythonEvalType, PythonFunction, SimplePythonFunction}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PythonUDTF
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.UserDefinedPythonDataSourceReadRunner
+
+object PythonDataSourceStrategy extends Strategy with Logging {

Review Comment:
   Sounds good!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362690595


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.

Review Comment:
   Good to know!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363396985


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...

Review Comment:
   is it the same as `raise NotImplementedError`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361272415


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+
+import net.razorvine.pickle.Pickler
+
+import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.PythonDataSource
+import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A user-defined Python data source. This is used by the Python API.
+ */
+case class UserDefinedPythonDataSource(
+    dataSource: PythonFunction,
+    schema: StructType) {
+  def apply(session: SparkSession): DataFrame = {
+    val source = PythonDataSource(dataSource, schema, output = toAttributes(schema))
+    Dataset.ofRows(session, source)
+  }
+}
+
+case class PythonDataSourceReadInfo(
+    func: Array[Byte],
+    partitions: Seq[Array[Byte]])
+
+class UserDefinedPythonDataSourceReadRunner(
+    func: PythonFunction,
+    schema: StructType) extends PythonPlannerRunner[PythonDataSourceReadInfo](func) {
+
+  override val workerModule = "pyspark.sql.worker.plan_data_source_read"
+
+  override protected def writeToPython(dataOut: DataOutputStream, pickler: Pickler): Unit = {
+    // Send Python data source
+    PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+    // Send schema
+    PythonWorkerUtils.writeUTF(schema.json, dataOut)
+  }
+
+  override protected def receiveFromPython(dataIn: DataInputStream): PythonDataSourceReadInfo = {
+    // Receive the picked reader or an exception raised in Python worker.
+    val length = dataIn.readInt()
+    if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)
+    }
+
+    // Receive the pickled 'read' function.
+    val pickledFunction: Array[Byte] = {
+      val obj = new Array[Byte](length)
+      dataIn.readFully(obj)
+      obj
+    }

Review Comment:
   Nice utility function!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364325976


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one

Review Comment:
   The default partition value should be `None` if the method is not implemented (indicating 1 partition).  I am opting to make `def partitions()` **optional** to implement from a usability perspective, making it easier for new users to get started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364357646


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PythonDataSourceStrategy.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.api.python.{PythonEvalType, PythonFunction, SimplePythonFunction}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PythonUDTF
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.UserDefinedPythonDataSourceReadRunner
+
+object PythonDataSourceStrategy extends Strategy with Logging {
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case p @ logical.PythonDataSource(dataSource: PythonFunction, schema, _) =>
+      val info = new UserDefinedPythonDataSourceReadRunner(dataSource, schema).runInPython()
+
+      val readerFunc = SimplePythonFunction(
+        command = info.func.toSeq,
+        envVars = dataSource.envVars,
+        pythonIncludes = dataSource.pythonIncludes,
+        pythonExec = dataSource.pythonExec,
+        pythonVer = dataSource.pythonVer,
+        broadcastVars = dataSource.broadcastVars,
+        accumulator = dataSource.accumulator
+      )
+
+      val partitionPlan = logical.PythonDataSourcePartition(info.partitions)
+
+      // Construct a Python UDTF for the reader function.
+      val pythonUDTF = PythonUDTF(
+        name = "PythonDataSourceReaderUDTF",
+        func = readerFunc,
+        elementSchema = schema,
+        children = partitionPlan.output,
+        evalType = PythonEvalType.SQL_TABLE_UDF,
+        udfDeterministic = false,

Review Comment:
   Maybe we can add an API in the data source or the reader / writer class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364356820


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.
+    """
+    try:
+        check_python_version(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
+        setup_memory_limits(memory_limit_mb)
+
+        setup_spark_files(infile)
+        setup_broadcasts(infile)
+
+        _accumulatorRegistry.clear()
+
+        # Receive the data source instance.
+        data_source = read_command(pickleSer, infile)
+        if not isinstance(data_source, DataSource):
+            raise PySparkRuntimeError(

Review Comment:
   Good point! These should be assertion errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364355768


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()

Review Comment:
   The `def schema()` method here can access `self.options` to generate schema based on the user inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361857907


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()

Review Comment:
   This is a bit hacky. For example, if we run this in Spark Connect, the schema inference would happen within Python Spark Connect client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361436612


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PythonDataSourceStrategy.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.api.python.{PythonEvalType, PythonFunction, SimplePythonFunction}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PythonUDTF
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.UserDefinedPythonDataSourceReadRunner
+
+object PythonDataSourceStrategy extends Strategy with Logging {

Review Comment:
   shall we make it a logical rule, similar to `V2ScanRelationPushDown`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364340178


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +83,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()
 
     def _df(self, jdf: JavaObject) -> "DataFrame":
         from pyspark.sql.dataframe import DataFrame
 
         return DataFrame(jdf, self._spark)
 
-    def format(self, source: str) -> "DataFrameReader":
+    def format(self, source: Union[str, Type[DataSource]]) -> "DataFrameReader":

Review Comment:
   Ah you meant we can't have a Python data source object when using it in Scala right? For example this won't work: `val df = spark.read.format(MyPythonDataSource).load()`. This is a valid point. 
   
   @HyukjinKwon just brainstorming here: in order to use the short name, a data source must be registered somewhere. Then we first need to have an API to register a data source, something like `spark.dataSource.register(...)` similar to UDF.  
   
   But can we make it more Pythonic? What if I just want to import a Python data source from a package and directly use it without registering? Having this `DataSource` object we can easily allow this use case:
   
   ```python
   from src.my.datasources import MyDataSource
   
   df = spark.read.format(MyDataSource).load()
   ``` 
   
   This is similar to UDFs, where we don't need to register them to use them in PySpark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364732463


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.
+    """
+    try:
+        check_python_version(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
+        setup_memory_limits(memory_limit_mb)
+
+        setup_spark_files(infile)
+        setup_broadcasts(infile)
+
+        _accumulatorRegistry.clear()
+
+        # Receive the data source instance.
+        data_source = read_command(pickleSer, infile)
+        if not isinstance(data_source, DataSource):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source instance of type 'DataSource', but "
+                f"got '{type(data_source).__name__}'."
+            )
+
+        # Receive the data source output schema.
+        schema_json = utf8_deserializer.loads(infile)
+        schema = _parse_datatype_json_string(schema_json)
+        if not isinstance(schema, StructType):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source schema of type 'StructType', but "
+                f"got '{type(schema).__name__}'."
+            )
+
+        # Instantiate data source reader.
+        try:
+            reader = data_source.reader(schema=schema)
+        except NotImplementedError:
+            raise PySparkRuntimeError(
+                "Unable to create the Python data source reader because the 'reader' "
+                "method hasn't been implemented."
+            )
+        except Exception as e:
+            raise PySparkRuntimeError(f"Unable to create the Python data source reader: {str(e)}")
+
+        # Generate all partitions.
+        partitions = list(reader.partitions() or [])
+        if len(partitions) == 0:
+            partitions = [None]
+
+        # Construct a UDTF.
+        class PythonDataSourceReaderUDTF:
+            def __init__(self) -> None:
+                self.ser = CloudPickleSerializer()
+
+            def eval(self, partition_bytes: Any) -> Iterator:
+                partition = self.ser.loads(partition_bytes)
+                yield from reader.read(partition)
+
+        command = PythonDataSourceReaderUDTF
+        pickleSer._write_with_length(command, outfile)
+
+        # Return the serialized partition values.
+        write_int(len(partitions), outfile)
+        for partition in partitions:
+            pickleSer._write_with_length(partition, outfile)
+
+    except BaseException as e:
+        handle_worker_exception(e, outfile)
+        sys.exit(-1)
+
+    send_accumulator_updates(outfile)

Review Comment:
   This is from `analyze_udtf.py` cc @ueshin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361859922


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.

Review Comment:
   Let's use two backquotes so it can be actual code block in the documentation:
   
   ```suggestion
           ``spark.read.format(...).load()`` to get the schema for a data source read operation.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361877692


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()

Review Comment:
   Also, I don't think we can support SQL usecases (registering it, and invoke it in SQL) if we do this here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361773633


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one

Review Comment:
   I believe there is a usecase. For example, in Scala, we have `LocalScan` that doesn't require partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #43360:
URL: https://github.com/apache/spark/pull/43360#issuecomment-1760701636

   cc @HyukjinKwon @ueshin @cloud-fan @dtenedor 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363408274


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage to generate a list
+        of partitions. If the method returns N partitions, then the planner will create
+        N tasks. Each task will then execute ``read(partition)`` in parallel, using each
+        partition value to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one
+        partition for the result DataFrame and use one single task to read the data.
+
+        Returns
+        -------
+        partitions : list
+            A list of partitions for this data source. The partition can be any arbitrary
+            serializable objects.
+
+        Notes
+        -----
+        This method should not return any un-picklable objects.
+
+        Examples
+        --------
+        Returns a list of integers:
+
+        >>> def partitions(self):
+        ...     return [1, 2, 3]
+
+        Returns a list of string:
+
+        >>> def partitions(self):
+        ...     return ["a", "b", "c"]
+
+        Returns a list of tuples:
+
+        >>> def partitions(self):
+        ...     return [("a", 1), ("b", 2), ("c", 3)]
+        """
+        ...
+
+    @abstractmethod
+    def read(self, partition: Any) -> Iterator[Union[Tuple, Row]]:
+        """
+        Generates data for a given partition and returns an iterator of tuples or rows.
+
+        This method is invoked once per partition by Spark tasks to read the data.
+        You can initialize any non-serializable resources required for reading data from
+        the data source within this method.
+
+        Implementing this method is required for readable data sources.
+
+        Parameters
+        ----------
+        partition : object
+            The partition to read. It must be one of the partition values returned by
+            ``partitions()`` or None if ``partitions()`` method is not implemented.

Review Comment:
   need to update this if `partitions()` is required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363396200


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.

Review Comment:
   I think it's better to not expose too many internal details. Just say it will be called only once when reading the data source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361260428


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during

Review Comment:
   Good pointed. Added in the docstring of the datasource class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43360:
URL: https://github.com/apache/spark/pull/43360#issuecomment-1770465246

   Can we address https://github.com/apache/spark/pull/43360#discussion_r1365227126 here? We can't incrementally improve in the current way when we add other language support, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1369086390


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...

Review Comment:
   Actually, we should raise the not implemented error here since the return type of `schema` should not be None



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage to generate a list
+        of partitions. If the method returns N partitions, then the planner will create
+        N tasks. Each task will then execute ``read(partition)`` in parallel, using each
+        partition value to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one
+        partition for the result DataFrame and use one single task to read the data.
+
+        Returns
+        -------
+        partitions : list
+            A list of partitions for this data source. The partition can be any arbitrary
+            serializable objects.
+
+        Notes
+        -----
+        This method should not return any un-picklable objects.
+
+        Examples
+        --------
+        Returns a list of integers:
+
+        >>> def partitions(self):
+        ...     return [1, 2, 3]
+
+        Returns a list of string:
+
+        >>> def partitions(self):
+        ...     return ["a", "b", "c"]
+
+        Returns a list of tuples:
+
+        >>> def partitions(self):
+        ...     return [("a", 1), ("b", 2), ("c", 3)]
+        """
+        ...
+
+    @abstractmethod
+    def read(self, partition: Any) -> Iterator[Union[Tuple, Row]]:
+        """
+        Generates data for a given partition and returns an iterator of tuples or rows.
+
+        This method is invoked once per partition by Spark tasks to read the data.
+        You can initialize any non-serializable resources required for reading data from
+        the data source within this method.
+
+        Implementing this method is required for readable data sources.
+
+        Parameters
+        ----------
+        partition : object
+            The partition to read. It must be one of the partition values returned by
+            ``partitions()`` or None if ``partitions()`` method is not implemented.

Review Comment:
   Updated the default implementation of `partitions()` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1369321618


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.
+    """
+    try:
+        check_python_version(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
+        setup_memory_limits(memory_limit_mb)
+
+        setup_spark_files(infile)
+        setup_broadcasts(infile)
+
+        _accumulatorRegistry.clear()
+
+        # Receive the data source instance.
+        data_source = read_command(pickleSer, infile)
+        if not isinstance(data_source, DataSource):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source instance of type 'DataSource', but "
+                f"got '{type(data_source).__name__}'."
+            )
+
+        # Receive the data source output schema.
+        schema_json = utf8_deserializer.loads(infile)
+        schema = _parse_datatype_json_string(schema_json)
+        if not isinstance(schema, StructType):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source schema of type 'StructType', but "
+                f"got '{type(schema).__name__}'."
+            )
+
+        # Instantiate data source reader.
+        try:
+            reader = data_source.reader(schema=schema)
+        except NotImplementedError:
+            raise PySparkRuntimeError(
+                "Unable to create the Python data source reader because the 'reader' "
+                "method hasn't been implemented."
+            )
+        except Exception as e:
+            raise PySparkRuntimeError(f"Unable to create the Python data source reader: {str(e)}")
+
+        # Generate all partitions.
+        partitions = list(reader.partitions() or [])
+        if len(partitions) == 0:
+            partitions = [None]
+
+        # Construct a UDTF.
+        class PythonDataSourceReaderUDTF:
+            def __init__(self) -> None:
+                self.ser = CloudPickleSerializer()
+
+            def eval(self, partition_bytes: Any) -> Iterator:
+                partition = self.ser.loads(partition_bytes)
+                yield from reader.read(partition)
+
+        command = PythonDataSourceReaderUDTF
+        pickleSer._write_with_length(command, outfile)
+
+        # Return the serialized partition values.
+        write_int(len(partitions), outfile)
+        for partition in partitions:
+            pickleSer._write_with_length(partition, outfile)
+
+    except BaseException as e:
+        handle_worker_exception(e, outfile)
+        sys.exit(-1)
+
+    send_accumulator_updates(outfile)

Review Comment:
   It's needed if the accumulator is allowed in the data source definition. 
   The file will be executed in a different Python process from the Driver Python process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1358666425


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during

Review Comment:
   If we want to create a user-defined Python materialization in the future that has no corresponding reader implementation, this API would preclude this. Should we instead change the invariant to "at least one of 'reader' and/or 'writer' must be implemented"?



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:

Review Comment:
   ```suggestion
   class DataSource:
       """
       This represents a user-defined table implemented in Python.
       Subsequent Spark programs can then query from and/or write to the table.
       This class defines the table's schema and declares an interface to define
       logic to generate the rows for a scan from the table, and to respond to
       writes of rows into the table later.
       
       """
   ```



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.

Review Comment:
   should we mention which serialization protocol must be supported? Is it the default Python serialization?



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be

Review Comment:
   optional: we're trying to move away from RDDs for future use of Spark, maybe just mention that if this method returns N partitions, then the query execution will create N instances of this class, each initialized with one of the partition buffers created earlier?



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one

Review Comment:
   I would recommend to make this method required. User-defined data sources without partitioning support for Spark don't make much sense, and forcing users to implement this method acts as a safeguard that performance will remain high because the data source evaluation can take place using multiple executors concurrently. If users just want to return a single stream of values, they can create a UDTF that accepts scalar arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364576961


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+
+import net.razorvine.pickle.Pickler
+
+import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.PythonDataSource
+import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A user-defined Python data source. This is used by the Python API.
+ */
+case class UserDefinedPythonDataSource(
+    dataSource: PythonFunction,
+    schema: StructType) {
+  def apply(session: SparkSession): DataFrame = {
+    val source = PythonDataSource(dataSource, schema, output = toAttributes(schema))
+    Dataset.ofRows(session, source)
+  }
+}
+
+case class PythonDataSourceReadInfo(
+    func: Array[Byte],
+    partitions: Seq[Array[Byte]])
+
+class UserDefinedPythonDataSourceReadRunner(
+    func: PythonFunction,
+    schema: StructType) extends PythonPlannerRunner[PythonDataSourceReadInfo](func) {
+
+  override val workerModule = "pyspark.sql.worker.plan_data_source_read"
+
+  override protected def writeToPython(dataOut: DataOutputStream, pickler: Pickler): Unit = {
+    // Send Python data source
+    PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+    // Send schema
+    PythonWorkerUtils.writeUTF(schema.json, dataOut)
+  }
+
+  override protected def receiveFromPython(dataIn: DataInputStream): PythonDataSourceReadInfo = {
+    // Receive the picked reader or an exception raised in Python worker.
+    val length = dataIn.readInt()
+    if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)

Review Comment:
   Will add some tests (and also change the name of the error)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin closed pull request #43360: [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API
URL: https://github.com/apache/spark/pull/43360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362562965


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one

Review Comment:
   Yes, it's possible, but you can always make a UDTF for that use case. That's very simple, you just have to make a class like this:
   
   ```
   class MyDataSource:
     def eval(self, arg, other):
       # yield rows here
   ```
   
   The user-defined data source can also explicitly define exactly one partition.
   Both options are easy and worth preventing the performance harm from not partitioning a large data source. Having seen the latter before, it often makes the query fail completely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363404498


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +82,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()

Review Comment:
   All these have to be removed. In theory, `DataFrameReader` should not have any change, and the Python sources are only referenced by `format` short name.
   
   So yes I would like to separate them somewhere to make it easier to remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363398618


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +83,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()
 
     def _df(self, jdf: JavaObject) -> "DataFrame":
         from pyspark.sql.dataframe import DataFrame
 
         return DataFrame(jdf, self._spark)
 
-    def format(self, source: str) -> "DataFrameReader":
+    def format(self, source: Union[str, Type[DataSource]]) -> "DataFrameReader":

Review Comment:
   I actually think also we should remove the support of directly passing `DataSource`. This implementation can't work with short names also if we want to make it working with other languages. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1369079177


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +83,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()
 
     def _df(self, jdf: JavaObject) -> "DataFrame":
         from pyspark.sql.dataframe import DataFrame
 
         return DataFrame(jdf, self._spark)
 
-    def format(self, source: str) -> "DataFrameReader":
+    def format(self, source: Union[str, Type[DataSource]]) -> "DataFrameReader":

Review Comment:
   Discussed offline. We should make it work with short names instead of changing the API here. I will revert the change in this file and work on this part in the next PR. This PR will focus on the execution and API of the data source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362753695


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +82,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()

Review Comment:
   @HyukjinKwon do you think we should define a separate class (like PyDataSourceReader) that contains all the logic related to Python data source? So that we don't need to expose these variables in the DataFrameReader class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363396200


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.

Review Comment:
   I think it's better to expose too many internal details. Just say it will be called only once when reading the data source.



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.

Review Comment:
   I think it's better to not expose too many internal details. Just say it will be called only once when reading the data source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364731275


##########
python/pyspark/sql/readwriter.py:
##########
@@ -151,8 +173,10 @@ def schema(self, schema: Union[StructType, str]) -> "DataFrameReader":
         if isinstance(schema, StructType):
             jschema = spark._jsparkSession.parseDataType(schema.json())
             self._jreader = self._jreader.schema(jschema)
+            self._schema = schema

Review Comment:
   I got a Mypy error. I will refactor the way to store this info: [SPARK-45600](https://issues.apache.org/jira/browse/SPARK-45600)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364499268


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()
+            if isinstance(schema, str):
+                dt = _parse_datatype_string(schema)

Review Comment:
   Yup. Added a TODO to make schema robust: [SPARK-45559](https://issues.apache.org/jira/browse/SPARK-45559)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #43360:
URL: https://github.com/apache/spark/pull/43360#issuecomment-1776011615

   @HyukjinKwon @cloud-fan @dtenedor thanks for the review! I've addressed the comments and removed the change in `readwriter` from this PR. It will be addressed in the next PR: [SPARK-45639](https://issues.apache.org/jira/browse/SPARK-45639)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361728845


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +83,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()
 
     def _df(self, jdf: JavaObject) -> "DataFrame":
         from pyspark.sql.dataframe import DataFrame
 
         return DataFrame(jdf, self._spark)
 
-    def format(self, source: str) -> "DataFrameReader":
+    def format(self, source: Union[str, Type[DataSource]]) -> "DataFrameReader":

Review Comment:
   can we support a short name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361734807


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:

Review Comment:
   Should we make this ABC class too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364871198


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +83,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()
 
     def _df(self, jdf: JavaObject) -> "DataFrame":
         from pyspark.sql.dataframe import DataFrame
 
         return DataFrame(jdf, self._spark)
 
-    def format(self, source: str) -> "DataFrameReader":
+    def format(self, source: Union[str, Type[DataSource]]) -> "DataFrameReader":

Review Comment:
   Yes, we should have a registration mechanism like we do in Scala https://github.com/databricks/spark-xml/blob/master/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister. 
   
   Can we investigate how other Python libraries do? One way is using INI file like https://packaging.python.org/en/latest/specifications/entry-points/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362686941


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage to generate a list
+        of partitions. If the method returns N partitions, then the planner will create
+        N tasks. Each task will then execute `read(partition)` in parallel, using each
+        partition value to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one
+        partition for the result DataFrame and use one single task to read the data.
+
+        Returns
+        -------
+        partitions : list
+            A list of partitions for this data source. The partition can be any arbitrary
+            serializable objects.
+
+        Notes
+        -----
+        This method should not return any un-picklable objects.
+
+        Examples
+        --------
+        Returns a list of integers:
+
+        >>> def partitions(self):
+        >>>     return [1, 2, 3]

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362690212


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()

Review Comment:
   Yea if a Python data source is registered and used in SQL, we need to have a separate Python process to instantiate the Python class and get the schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364623031


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +82,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()

Review Comment:
   Ok sounds good. I will refactor this in the next PR to make the review easier. Added a TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363410031


##########
python/pyspark/sql/readwriter.py:
##########
@@ -151,8 +173,10 @@ def schema(self, schema: Union[StructType, str]) -> "DataFrameReader":
         if isinstance(schema, StructType):
             jschema = spark._jsparkSession.parseDataType(schema.json())
             self._jreader = self._jreader.schema(jschema)
+            self._schema = schema

Review Comment:
   why do we put it in both if and else instead of putting it after if-else only once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364354405


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.

Review Comment:
   Cool, thanks for making a separate Jira for it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361389676


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +329,50 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):

Review Comment:
   Sounds good to me!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361274421


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +329,50 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):

Review Comment:
   Actually, the `self._format` here is not an instance of `DataSource`, it's the DataSource class (`Type[DataSource]`). Maybe I can just check if it's instance of `Type`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1365227126


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +83,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()
 
     def _df(self, jdf: JavaObject) -> "DataFrame":
         from pyspark.sql.dataframe import DataFrame
 
         return DataFrame(jdf, self._spark)
 
-    def format(self, source: str) -> "DataFrameReader":
+    def format(self, source: Union[str, Type[DataSource]]) -> "DataFrameReader":

Review Comment:
   1. Create a INI file either under Spark conf directory, or current working directory (maybe optionally define the location via a static configuration).
   2. Read it in `DataSource.lookupDataSource`, and loads the class. We will likely need a wrapper class that inherits Scala Datasource classes.
   3. The wrapper Datasource invokes the Python interface to read/write, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #43360:
URL: https://github.com/apache/spark/pull/43360#issuecomment-1777793417

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1369210083


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonDataSourcePartitionExec.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{InputRDDCodegen, LeafExecNode, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+case class PythonDataSourcePartitionExec(
+    output: Seq[Attribute],
+    partitions: Seq[Array[Byte]])
+  extends LeafExecNode with InputRDDCodegen with PythonSQLMetrics {
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  @transient private lazy val unsafeRows: Array[InternalRow] = {
+    if (partitions.isEmpty) {
+      Array.empty
+    } else {
+      val proj = UnsafeProjection.create(output, output)
+      partitions.map(p => proj(InternalRow(p)).copy()).toArray
+    }
+  }
+
+  @transient private lazy val rdd: RDD[InternalRow] = {
+    val numPartitions = partitions.size
+    if (numPartitions == 0) {
+      sparkContext.emptyRDD
+    } else {
+      sparkContext.parallelize(unsafeRows, numPartitions)
+    }
+  }
+
+  override def inputRDD: RDD[InternalRow] = rdd
+
+  // Input is InternalRow, has to be turned into UnsafeRows.
+  override protected val createUnsafeProjection: Boolean = true
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    rdd.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
+
+  override def executeCollect(): Array[InternalRow] = {

Review Comment:
   That's right. This node should never be a root node. I will remove these methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362627217


##########
python/pyspark/sql/readwriter.py:
##########
@@ -69,20 +83,26 @@ class DataFrameReader(OptionUtils):
     def __init__(self, spark: "SparkSession"):
         self._jreader = spark._jsparkSession.read()
         self._spark = spark
+        self._format: Optional[Union[str, Type[DataSource]]] = None
+        self._schema: Optional[Union[str, StructType]] = None
+        self._options: Dict[str, "OptionalPrimitiveType"] = dict()
 
     def _df(self, jdf: JavaObject) -> "DataFrame":
         from pyspark.sql.dataframe import DataFrame
 
         return DataFrame(jdf, self._spark)
 
-    def format(self, source: str) -> "DataFrameReader":
+    def format(self, source: Union[str, Type[DataSource]]) -> "DataFrameReader":

Review Comment:
   Yes we should also support using short names. But we need to support registering the data source first. This is a TODO in the PR description: `4. Register a Python data source and use its name to reference it in .format()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362699616


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()

Review Comment:
   Indeed it's a bit hacky right now. I will give this further thought when supporting `spark.read.schema(...)` in SPARK-45559. Currently, it does not work with the `schema(...)` clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1362706226


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.

Review Comment:
   ```suggestion
       `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.
       You can also create tables using SQL with:
       `CREATE TABLE tableName(<columnList>) USING <dataSourceName> OPTIONS <options>`
   ```



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and

Review Comment:
   ```suggestion
       This class represents a custom data source that allows for reading from and/or
   ```



##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.
+    """
+    try:
+        check_python_version(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
+        setup_memory_limits(memory_limit_mb)
+
+        setup_spark_files(infile)
+        setup_broadcasts(infile)
+
+        _accumulatorRegistry.clear()
+
+        # Receive the data source instance.
+        data_source = read_command(pickleSer, infile)
+        if not isinstance(data_source, DataSource):
+            raise PySparkRuntimeError(

Review Comment:
   in general we should give error classes to each of these. But it seems this should never happen, should we just turn it into an assert instead? Same for other `PySparkRuntimeError` cases below.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PythonDataSourceStrategy.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.api.python.{PythonEvalType, PythonFunction, SimplePythonFunction}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PythonUDTF
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.UserDefinedPythonDataSourceReadRunner
+
+object PythonDataSourceStrategy extends Strategy with Logging {
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case p @ logical.PythonDataSource(dataSource: PythonFunction, schema, _) =>
+      val info = new UserDefinedPythonDataSourceReadRunner(dataSource, schema).runInPython()
+
+      val readerFunc = SimplePythonFunction(
+        command = info.func.toSeq,
+        envVars = dataSource.envVars,
+        pythonIncludes = dataSource.pythonIncludes,
+        pythonExec = dataSource.pythonExec,
+        pythonVer = dataSource.pythonVer,
+        broadcastVars = dataSource.broadcastVars,
+        accumulator = dataSource.accumulator
+      )
+
+      val partitionPlan = logical.PythonDataSourcePartition(info.partitions)
+
+      // Construct a Python UDTF for the reader function.
+      val pythonUDTF = PythonUDTF(
+        name = "PythonDataSourceReaderUDTF",
+        func = readerFunc,
+        elementSchema = schema,
+        children = partitionPlan.output,
+        evalType = PythonEvalType.SQL_TABLE_UDF,
+        udfDeterministic = false,

Review Comment:
   we should probably add an option for the user to toggle this. This default of `false` sounds good.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonDataSourcePartitionExec.scala:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{InputRDDCodegen, LeafExecNode, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+case class PythonDataSourcePartitionExec(

Review Comment:
   please add a class comment?



##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()

Review Comment:
   It will be necessary to inspect the values of the provided options for each call in order to determine which schema to generate. This is analogous to the Python UDTF `analyze` call we designed which runs on the driver.



##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.

Review Comment:
   can we expand this a bit more to mention when this gets called during the query processing sequence, and what it is responsible for receiving as input and returning as output?



##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.
+    """
+    try:
+        check_python_version(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
+        setup_memory_limits(memory_limit_mb)
+
+        setup_spark_files(infile)
+        setup_broadcasts(infile)
+
+        _accumulatorRegistry.clear()
+
+        # Receive the data source instance.
+        data_source = read_command(pickleSer, infile)
+        if not isinstance(data_source, DataSource):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source instance of type 'DataSource', but "
+                f"got '{type(data_source).__name__}'."
+            )
+
+        # Receive the data source output schema.
+        schema_json = utf8_deserializer.loads(infile)
+        schema = _parse_datatype_json_string(schema_json)
+        if not isinstance(schema, StructType):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source schema of type 'StructType', but "
+                f"got '{type(schema).__name__}'."
+            )
+
+        # Instantiate data source reader.
+        try:
+            reader = data_source.reader(schema=schema)
+        except NotImplementedError:
+            raise PySparkRuntimeError(

Review Comment:
   this one is possible to exercise if the data source is implemented incorrectly; can we add an error class and a test case? Same for L87 below?



##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()
+            if isinstance(schema, str):
+                dt = _parse_datatype_string(schema)

Review Comment:
   we will want to check for parsing failures here and return a reasonable error message.



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.

Review Comment:
   ```suggestion
       readable or writable (or both).
   ```



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"

Review Comment:
   can we add test cases where we send the schema as a string instead of StructType, and a positive case as well as a negative case where it doesn't parse successfully with fromDDL?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+
+import net.razorvine.pickle.Pickler
+
+import org.apache.spark.api.python.{PythonFunction, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.PythonDataSource
+import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A user-defined Python data source. This is used by the Python API.
+ */
+case class UserDefinedPythonDataSource(
+    dataSource: PythonFunction,
+    schema: StructType) {
+  def apply(session: SparkSession): DataFrame = {
+    val source = PythonDataSource(dataSource, schema, output = toAttributes(schema))
+    Dataset.ofRows(session, source)
+  }
+}
+
+case class PythonDataSourceReadInfo(
+    func: Array[Byte],
+    partitions: Seq[Array[Byte]])
+
+class UserDefinedPythonDataSourceReadRunner(
+    func: PythonFunction,
+    schema: StructType) extends PythonPlannerRunner[PythonDataSourceReadInfo](func) {
+
+  override val workerModule = "pyspark.sql.worker.plan_data_source_read"
+
+  override protected def writeToPython(dataOut: DataOutputStream, pickler: Pickler): Unit = {
+    // Send Python data source
+    PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+    // Send schema
+    PythonWorkerUtils.writeUTF(schema.json, dataOut)
+  }
+
+  override protected def receiveFromPython(dataIn: DataInputStream): PythonDataSourceReadInfo = {
+    // Receive the picked reader or an exception raised in Python worker.
+    val length = dataIn.readInt()
+    if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) {
+      val msg = PythonWorkerUtils.readUTF(dataIn)
+      throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)

Review Comment:
   can we test it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361271332


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.

Review Comment:
   I am not sure how many people are familiar with the serialization protocol here. I can change `serializable` to `picklable` to make it clearer.



##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be

Review Comment:
   Good point. Will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363959026


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.
+    """
+    try:
+        check_python_version(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
+        setup_memory_limits(memory_limit_mb)
+
+        setup_spark_files(infile)
+        setup_broadcasts(infile)
+
+        _accumulatorRegistry.clear()
+
+        # Receive the data source instance.
+        data_source = read_command(pickleSer, infile)
+        if not isinstance(data_source, DataSource):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source instance of type 'DataSource', but "
+                f"got '{type(data_source).__name__}'."
+            )
+
+        # Receive the data source output schema.
+        schema_json = utf8_deserializer.loads(infile)
+        schema = _parse_datatype_json_string(schema_json)
+        if not isinstance(schema, StructType):
+            raise PySparkRuntimeError(
+                f"Expected a Python data source schema of type 'StructType', but "
+                f"got '{type(schema).__name__}'."
+            )
+
+        # Instantiate data source reader.
+        try:
+            reader = data_source.reader(schema=schema)
+        except NotImplementedError:
+            raise PySparkRuntimeError(
+                "Unable to create the Python data source reader because the 'reader' "
+                "method hasn't been implemented."
+            )
+        except Exception as e:
+            raise PySparkRuntimeError(f"Unable to create the Python data source reader: {str(e)}")
+
+        # Generate all partitions.
+        partitions = list(reader.partitions() or [])
+        if len(partitions) == 0:
+            partitions = [None]
+
+        # Construct a UDTF.
+        class PythonDataSourceReaderUDTF:
+            def __init__(self) -> None:
+                self.ser = CloudPickleSerializer()
+
+            def eval(self, partition_bytes: Any) -> Iterator:
+                partition = self.ser.loads(partition_bytes)
+                yield from reader.read(partition)
+
+        command = PythonDataSourceReaderUDTF
+        pickleSer._write_with_length(command, outfile)
+
+        # Return the serialized partition values.
+        write_int(len(partitions), outfile)
+        for partition in partitions:
+            pickleSer._write_with_length(partition, outfile)
+
+    except BaseException as e:
+        handle_worker_exception(e, outfile)
+        sys.exit(-1)
+
+    send_accumulator_updates(outfile)

Review Comment:
   is it really needed? This python process runs on the driver side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363961257


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala:
##########
@@ -101,7 +102,8 @@ class SparkOptimizer(
     V2ScanRelationPushDown.ruleName :+
     V2ScanPartitioningAndOrdering.ruleName :+
     V2Writes.ruleName :+
-    ReplaceCTERefWithRepartition.ruleName
+    ReplaceCTERefWithRepartition.ruleName :+
+    V2ScanRelationPushDown.ruleName

Review Comment:
   shall we put `PlanPythonDataSourceScan` here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1365851628


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PythonDataSourceStrategy.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.api.python.{PythonEvalType, PythonFunction, SimplePythonFunction}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PythonUDTF
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.UserDefinedPythonDataSourceReadRunner
+
+object PythonDataSourceStrategy extends Strategy with Logging {
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case p @ logical.PythonDataSource(dataSource: PythonFunction, schema, _) =>
+      val info = new UserDefinedPythonDataSourceReadRunner(dataSource, schema).runInPython()
+
+      val readerFunc = SimplePythonFunction(
+        command = info.func.toSeq,
+        envVars = dataSource.envVars,
+        pythonIncludes = dataSource.pythonIncludes,
+        pythonExec = dataSource.pythonExec,
+        pythonVer = dataSource.pythonVer,
+        broadcastVars = dataSource.broadcastVars,
+        accumulator = dataSource.accumulator
+      )
+
+      val partitionPlan = logical.PythonDataSourcePartition(info.partitions)
+
+      // Construct a Python UDTF for the reader function.
+      val pythonUDTF = PythonUDTF(
+        name = "PythonDataSourceReaderUDTF",
+        func = readerFunc,
+        elementSchema = schema,
+        children = partitionPlan.output,
+        evalType = PythonEvalType.SQL_TABLE_UDF,
+        udfDeterministic = false,

Review Comment:
   Sure, this sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363984382


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonDataSourcePartitionExec.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{InputRDDCodegen, LeafExecNode, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+case class PythonDataSourcePartitionExec(
+    output: Seq[Attribute],
+    partitions: Seq[Array[Byte]])
+  extends LeafExecNode with InputRDDCodegen with PythonSQLMetrics {
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  @transient private lazy val unsafeRows: Array[InternalRow] = {
+    if (partitions.isEmpty) {
+      Array.empty
+    } else {
+      val proj = UnsafeProjection.create(output, output)
+      partitions.map(p => proj(InternalRow(p)).copy()).toArray
+    }
+  }
+
+  @transient private lazy val rdd: RDD[InternalRow] = {
+    val numPartitions = partitions.size
+    if (numPartitions == 0) {
+      sparkContext.emptyRDD
+    } else {
+      sparkContext.parallelize(unsafeRows, numPartitions)
+    }
+  }
+
+  override def inputRDD: RDD[InternalRow] = rdd
+
+  // Input is InternalRow, has to be turned into UnsafeRows.

Review Comment:
   input is unsafe row, isn't it? We invoked the unsafe projection in `lazy val unsafeRows`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364359184


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource(ABC):
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods ``reader``
+    or ``writer`` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    ``spark.read.format(...).load()`` and save data using ``df.write.format(...).save()``.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the ``options`` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        ``spark.read.format(...).load()`` to get the schema for a data source read
+        operation. If this method is not implemented, and a user does not provide a
+        schema when reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        ...    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        ...   return StructType().add("a", "int").add("b", "string")
+        """
+        ...

Review Comment:
   This is equivalent to returning None.
   ```python
   def test():
     ...
   v = test()
   v is None 
   > True
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361303331


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +329,50 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):

Review Comment:
   Ah, then, how about:
   ```py
   isintance(self._format, Type) and issubclass(self._format, DataSource)
   ```
   ?
   - https://docs.python.org/3/library/functions.html#issubclass



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361749110


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage to generate a list
+        of partitions. If the method returns N partitions, then the planner will create
+        N tasks. Each task will then execute `read(partition)` in parallel, using each
+        partition value to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one
+        partition for the result DataFrame and use one single task to read the data.
+
+        Returns
+        -------
+        partitions : list
+            A list of partitions for this data source. The partition can be any arbitrary
+            serializable objects.
+
+        Notes
+        -----
+        This method should not return any un-picklable objects.
+
+        Examples
+        --------
+        Returns a list of integers:
+
+        >>> def partitions(self):
+        >>>     return [1, 2, 3]

Review Comment:
   ```suggestion
           ...     return [1, 2, 3]
   ```
   
   and should update below too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1361877692


##########
python/pyspark/sql/readwriter.py:
##########
@@ -303,6 +330,40 @@ def load(
         if schema is not None:
             self.schema(schema)
         self.options(**options)
+
+        # Load a Python data source
+        if isinstance(self._format, Callable):  # type: ignore[arg-type]
+            # TODO(SPARK-45560): support load() with non-empty path.
+
+            # Create an instance of the data source.
+            data_source_cls = cast(Type[DataSource], self._format)
+            data_source = data_source_cls(self._options)
+
+            # Get schema of the data source
+            schema = self._schema or data_source.schema()

Review Comment:
   Also, I don't think we can support SQL usecases if we do this here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363400814


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one

Review Comment:
   +1 to make this required. I don't think we have a reasonable default value for the partition (use null?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363960136


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala:
##########
@@ -78,6 +80,42 @@ case class PythonMapInArrow(
     copy(child = newChild)
 }
 
+/**
+ * Represents a Python data source.
+ */
+case class PythonDataSource(
+    dataSource: PythonFunction,
+    outputSchema: StructType,
+    override val output: Seq[Attribute]) extends LeafNode {
+  require(output.forall(_.resolved),
+    "Unresolved attributes found when constructing PythonDataSource.")
+  override protected def stringArgs: Iterator[Any] = {
+    Iterator(output)
+  }
+  final override val nodePatterns: Seq[TreePattern] = Seq(PYTHON_DATA_SOURCE)
+}
+
+/**
+ * Represents a list of Python data source partitions.
+ */
+case class PythonDataSourcePartition(

Review Comment:
   ```suggestion
   case class PythonDataSourcePartitions(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363987031


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonDataSourcePartitionExec.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{InputRDDCodegen, LeafExecNode, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+case class PythonDataSourcePartitionExec(
+    output: Seq[Attribute],
+    partitions: Seq[Array[Byte]])
+  extends LeafExecNode with InputRDDCodegen with PythonSQLMetrics {
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  @transient private lazy val unsafeRows: Array[InternalRow] = {
+    if (partitions.isEmpty) {
+      Array.empty
+    } else {
+      val proj = UnsafeProjection.create(output, output)
+      partitions.map(p => proj(InternalRow(p)).copy()).toArray
+    }
+  }
+
+  @transient private lazy val rdd: RDD[InternalRow] = {
+    val numPartitions = partitions.size
+    if (numPartitions == 0) {
+      sparkContext.emptyRDD
+    } else {
+      sparkContext.parallelize(unsafeRows, numPartitions)
+    }
+  }
+
+  override def inputRDD: RDD[InternalRow] = rdd
+
+  // Input is InternalRow, has to be turned into UnsafeRows.
+  override protected val createUnsafeProjection: Boolean = true
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    rdd.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
+
+  override def executeCollect(): Array[InternalRow] = {

Review Comment:
   Do we really need to overwrite these methods? `PythonDataSourcePartitionExec` will never be the root node that can trigger the fast collect code path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1363985526


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonDataSourcePartitionExec.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{InputRDDCodegen, LeafExecNode, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+case class PythonDataSourcePartitionExec(
+    output: Seq[Attribute],
+    partitions: Seq[Array[Byte]])
+  extends LeafExecNode with InputRDDCodegen with PythonSQLMetrics {
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  @transient private lazy val unsafeRows: Array[InternalRow] = {
+    if (partitions.isEmpty) {
+      Array.empty
+    } else {
+      val proj = UnsafeProjection.create(output, output)
+      partitions.map(p => proj(InternalRow(p)).copy()).toArray
+    }
+  }
+
+  @transient private lazy val rdd: RDD[InternalRow] = {
+    val numPartitions = partitions.size
+    if (numPartitions == 0) {
+      sparkContext.emptyRDD
+    } else {
+      sparkContext.parallelize(unsafeRows, numPartitions)
+    }
+  }
+
+  override def inputRDD: RDD[InternalRow] = rdd
+
+  // Input is InternalRow, has to be turned into UnsafeRows.
+  override protected val createUnsafeProjection: Boolean = true
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")

Review Comment:
   I think this can be a driver-side metrics, as it equals to the number of partitions, which is known at the driver side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364353580


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    """
+    A base class for data sources.
+
+    This class represents a custom data source that allows for reading from and
+    writing to it. The data source provides methods to create readers and writers
+    for reading and writing data, respectively. At least one of the methods `reader`
+    or `writer` must be implemented by any subclass to make the data source either
+    readable or writable.
+
+    After implementing this interface, you can start to load your data source using
+    `spark.read.format(...).load()` and save data using `df.write.format(...).save()`.
+    """
+
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"

Review Comment:
   Yes. This PR does not support `.schema(...)`. Here is the follow up task: [SPARK-45559](https://issues.apache.org/jira/browse/SPARK-45559). I will add more test cases for schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364356233


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import Any, IO, Iterator
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_int,
+    write_int,
+    SpecialLengths,
+    CloudPickleSerializer,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import _parse_datatype_json_string, StructType
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_broadcasts,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    """
+    Plan Python data source read.

Review Comment:
   Sure!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43360:
URL: https://github.com/apache/spark/pull/43360#discussion_r1364339033


##########
python/pyspark/sql/datasource.py:
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Iterator, Tuple, Union, TYPE_CHECKING
+
+from pyspark.sql import Row
+from pyspark.sql.types import StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import OptionalPrimitiveType
+
+
+__all__ = ["DataSource", "DataSourceReader"]
+
+
+class DataSource:
+    def __init__(self, options: Dict[str, "OptionalPrimitiveType"]):
+        """
+        Initializes the data source with user-provided options.
+
+        Parameters
+        ----------
+        options : dict
+            A dictionary representing the options for this data source.
+
+        Notes
+        -----
+        This method should not contain any non-serializable objects.
+        """
+        self.options = options
+
+    @property
+    def name(self) -> str:
+        """
+        Returns a string represents the short name of this data source.
+        """
+        return self.__class__.__name__
+
+    def schema(self) -> Union[StructType, str]:
+        """
+        Returns the schema of the data source.
+
+        It can reference the `options` field to infer the data source's schema when
+        users do not explicitly specify it. This method is invoked once when calling
+        `spark.read.format(...).load()` to get the schema for a data source read operation.
+        If this method is not implemented, and a user does not provide a schema when
+        reading the data source, an exception will be thrown.
+
+        Returns
+        -------
+        schema : StructType or str
+            The schema of this data source or a DDL string represents the schema
+
+        Examples
+        --------
+        Returns a DDL string:
+
+        >>> def schema(self):
+        >>>    return "a INT, b STRING"
+
+        Returns a StructType:
+
+        >>> def schema(self):
+        >>>   return StructType().add("a", "int").add("b", "string")
+        """
+        ...
+
+    def reader(self, schema: StructType) -> "DataSourceReader":
+        """
+        Returns a DataSourceReader instance for reading data.
+
+        This method is required for readable data sources. It will be called once during
+        the physical planning stage in the Spark planner.
+
+        Parameters
+        ----------
+        schema : StructType
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : DataSourceReader
+            A reader instance for this data source.
+        """
+        raise NotImplementedError
+
+
+class DataSourceReader(ABC):
+    """
+    A base class for data source readers. Data source readers are responsible for
+    outputting data from a data source.
+    """
+
+    def partitions(self) -> Iterator[Any]:
+        """
+        Returns a list of partitions for this data source.
+
+        This method is called once during the physical planning stage in the Spark planner
+        to generate a list of partitions. Note, partition values must be serializable.
+
+        The planner then creates an RDD for each partition. Each partition value will be
+        passed to `read(partition)` to read the data from this data source.
+
+        If this method is not implemented, or returns an empty list, Spark will create one

Review Comment:
   If you want to do that, please comment very carefully in the `DataSourceReader` class docstring itself that performance is likely to be limited for large data sets if the `partitions` method is not implemented or returns one partition (but it is OK to skip it if the result data set is small). We will want to make sure that every reader of this abstract base class thinks about this in order to prevent unintentionally making large data sources without any parallelism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org