You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/25 19:35:06 UTC

[GitHub] [spark] bjornjorgensen commented on a diff in pull request #38801: [SPARK-40539] [CONNECT] [PYTHON] Add basic support for DataFrameWriter

bjornjorgensen commented on code in PR #38801:
URL: https://github.com/apache/spark/pull/38801#discussion_r1032662076


##########
python/pyspark/sql/connect/writer.py:
##########
@@ -0,0 +1,922 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from typing import Optional, Union, List, overload, Tuple, cast, Any
+from typing import TYPE_CHECKING
+
+from pyspark.sql.connect.plan import WriteOperation, LogicalPlan
+from pyspark.sql.types import StructType
+from pyspark.sql.utils import to_str
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect._typing import OptionalPrimitiveType
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+PathOrPaths = Union[str, List[str]]
+TupleOrListOfString = Union[List[str], Tuple[str, ...]]
+
+
+class OptionUtils:
+    def _set_opts(
+        self,
+        schema: Optional[Union[StructType, str]] = None,
+        **options: "OptionalPrimitiveType",
+    ) -> None:
+        """
+        Set named options (filter out those the value is None)
+        """
+        if schema is not None:
+            self.schema(schema)  # type: ignore[attr-defined]
+        for k, v in options.items():
+            if v is not None:
+                self.option(k, v)  # type: ignore[attr-defined]
+
+
+class DataFrameWriter(OptionUtils):
+    """
+    Interface used to write a :class:`DataFrame` to external storage systems
+    (e.g. file systems, key-value stores, etc). Use :attr:`DataFrame.write`
+    to access this.
+
+    .. versionadded:: 3.4.0
+    """
+
+    def __init__(self, plan: "LogicalPlan", session: "RemoteSparkSession"):
+        self._df: "LogicalPlan" = plan
+        self._spark: "RemoteSparkSession" = session
+        self._write: "WriteOperation" = WriteOperation(self._df)
+
+    def mode(self, saveMode: Optional[str]) -> "DataFrameWriter":
+        """Specifies the behavior when data or table already exists.
+
+        Options include:
+
+        * `append`: Append contents of this :class:`DataFrame` to existing data.
+        * `overwrite`: Overwrite existing data.
+        * `error` or `errorifexists`: Throw an exception if data already exists.
+        * `ignore`: Silently ignore this operation if data already exists.
+
+        .. versionadded:: 3.4.0
+
+        Examples
+        --------
+        Raise an error when writing to an existing path.
+
+        >>> import tempfile
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     spark.createDataFrame(
+        ...         [{"age": 80, "name": "Xinrong Meng"}]
+        ...     ).write.mode("error").format("parquet").save(d)
+        Traceback (most recent call last):
+            ...
+        pyspark.sql.utils.AnalysisException: ...
+
+        Write a Parquet file back with various options, and read it back.
+
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     # Overwrite the path with a new Parquet file
+        ...     spark.createDataFrame(
+        ...         [{"age": 100, "name": "Hyukjin Kwon"}]
+        ...     ).write.mode("overwrite").format("parquet").save(d)
+        ...
+        ...     # Append another DataFrame into the Parquet file
+        ...     spark.createDataFrame(
+        ...         [{"age": 120, "name": "Takuya Ueshin"}]
+        ...     ).write.mode("append").format("parquet").save(d)
+        ...
+        ...     # Append another DataFrame into the Parquet file
+        ...     spark.createDataFrame(
+        ...         [{"age": 140, "name": "Haejoon Lee"}]
+        ...     ).write.mode("ignore").format("parquet").save(d)
+        ...
+        ...     # Read the Parquet file as a DataFrame.
+        ...     spark.read.parquet(d).show()
+        +---+-------------+
+        |age|         name|
+        +---+-------------+
+        |120|Takuya Ueshin|
+        |100| Hyukjin Kwon|
+        +---+-------------+
+        """
+        # At the JVM side, the default value of mode is already set to "error".
+        # So, if the given saveMode is None, we will not call JVM-side's mode method.
+        if saveMode is not None:
+            self._write.mode = saveMode
+        return self
+
+    def format(self, source: str) -> "DataFrameWriter":
+        """Specifies the underlying output data source.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        source : str
+            string, name of the data source, e.g. 'json', 'parquet'.
+
+        Examples
+        --------
+        >>> spark.range(1).write.format('parquet')
+        <pyspark.sql.readwriter.DataFrameWriter object ...>
+
+        Write a DataFrame into a Parquet file and read it back.
+
+        >>> import tempfile
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     # Write a DataFrame into a Parquet file
+        ...     spark.createDataFrame(
+        ...         [{"age": 100, "name": "Hyukjin Kwon"}]
+        ...     ).write.mode("overwrite").format("parquet").save(d)
+        ...
+        ...     # Read the Parquet file as a DataFrame.
+        ...     spark.read.format('parquet').load(d).show()
+        +---+------------+
+        |age|        name|
+        +---+------------+
+        |100|Hyukjin Kwon|
+        +---+------------+
+        """
+        self._write.source = source
+        return self
+
+    def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameWriter":
+        """
+        Adds a output option for the underlying data source.

Review Comment:
   Adds a output



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org