You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/10 18:19:22 UTC

[GitHub] [spark] grundprinzip opened a new pull request, #38192: [CONNECT] [SPARK-40737] Add basic support for DataFrameWriter

grundprinzip opened a new pull request, #38192:
URL: https://github.com/apache/spark/pull/38192

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r997107932


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string source = 2;
+  // The destination of the write operation must be either a path or a table.
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  SaveMode mode = 5;

Review Comment:
   We added `DataFrameWriterV2` because we believe `SaveMode` is a bad design. It's confusing if we write to a table, and the table location is not empty. It's unclear if the save mode should apply to table existence or location existence.
   
   Anyway, we need to support save mode in the proto definition to support the existing DF API. If we want to support `DataFrameWriterV2` in Spark connect client, we should probably have a new proto definition without save mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [CONNECT] [SPARK-40737] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r991640559


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import org.apache.spark.SparkFunSuite
+
+class SparkConnectCommandPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
+
+  test("Write Relation") {

Review Comment:
   I added a bunch of more tests for the command translation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995378123


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -34,59 +36,109 @@ package object dsl {
       val identifier = CatalystSqlParser.parseMultipartIdentifier(s)
 
       def protoAttr: proto.Expression =
-        proto.Expression.newBuilder()
+        proto.Expression
+          .newBuilder()
           .setUnresolvedAttribute(
-            proto.Expression.UnresolvedAttribute.newBuilder()
+            proto.Expression.UnresolvedAttribute
+              .newBuilder()
               .addAllParts(identifier.asJava)
               .build())
           .build()
     }
 
     implicit class DslExpression(val expr: proto.Expression) {
-      def as(alias: String): proto.Expression = proto.Expression.newBuilder().setAlias(
-        proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr)).build()
-
-      def < (other: proto.Expression): proto.Expression =
-        proto.Expression.newBuilder().setUnresolvedFunction(
-          proto.Expression.UnresolvedFunction.newBuilder()
-            .addParts("<")
-            .addArguments(expr)
-            .addArguments(other)
-        ).build()
+      def as(alias: String): proto.Expression = proto.Expression

Review Comment:
   It's an annoying mix. The conflict resolution on the merge conflict was ugly and then I had to clean it up. Doing this manually sucks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r992203257


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -24,10 +24,16 @@ import com.google.common.collect.{Lists, Maps}
 import org.apache.spark.annotation.{Since, Unstable}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.connect.proto.WriteOperation
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
+final case class InvalidCommandInput(

Review Comment:
   Should we use `IllegalArgumentException` here? Or do you feel this needs its own specific exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r992686507


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +82,60 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to the
+   * corresponding logical plan. Afterwards, creates the DataFrameWriter and translates the
+   * parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+    // Transform the input plan into the logical plan.
+    val planner = new SparkConnectPlanner(writeOperation.getInput, session)
+    val plan = planner.transform()
+    // And create a Dataset from the plan.
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val w = dataset.write
+    if (writeOperation.getOptionsCount > 0) {
+      writeOperation.getOptionsList.asScala.foreach(x => w.option(x.getKey, x.getValue))
+    }
+
+    if (writeOperation.getSortColumnNamesCount > 0) {
+      val names = writeOperation.getSortColumnNamesList.asScala
+      w.sortBy(names.head, names.tail.toSeq: _*)
+    }
+
+    if (writeOperation.hasBucketBy) {
+      val op = writeOperation.getBucketBy
+      val cols = op.getColumnsList.asScala
+      if (op.getBucketCount <= 0) {
+        throw InvalidCommandInput(
+          s"BucketBy must specify a bucket count > 0, received ${op.getBucketCount} instead.")
+      }
+      w.bucketBy(op.getBucketCount, cols.head, cols.tail.toSeq: _*)
+    }
+
+    if (writeOperation.getPartitionByColumnsCount > 0) {
+      val names = writeOperation.getPartitionByColumnsList.asScala
+      w.partitionBy(names.toSeq: _*)
+    }
+
+    if (writeOperation.getFormat != null) {

Review Comment:
   yes, sorry, `hasXXX` exists only for composite objects like messages but not for scalar values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995370041


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string format = 2;
+  // The destination of the write operation must be either a path or a table.
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  SaveMode mode = 5;
+  // List of columns to sort the output by.
+  repeated string sortColumnNames = 6;
+  // List of columns for partitioning.
+  repeated string partitionByColumns = 7;

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r997107932


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string source = 2;
+  // The destination of the write operation must be either a path or a table.
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  SaveMode mode = 5;

Review Comment:
   We added `DataFrameWriterV2` because we believe `SaveMode` is a bad design. It's confusing if we write to a table, as there are so many options: create if not exists, create or replace, replace if exists, append if exists, overwrite data if exists, etc.
   
   Anyway, we need to support save mode in the proto definition to support the existing DF API. If we want to support `DataFrameWriterV2` in Spark connect client, we should probably have a new proto definition without save mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38192: [CONNECT] [SPARK-40737] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r991559720


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import org.apache.spark.SparkFunSuite
+
+class SparkConnectCommandPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
+
+  test("Write Relation") {

Review Comment:
   We probably need a little bit more coverage :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995338414


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -24,10 +24,16 @@ import com.google.common.collect.{Lists, Maps}
 import org.apache.spark.annotation.{Since, Unstable}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.connect.proto.WriteOperation
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
+final case class InvalidCommandInput(

Review Comment:
   If this is a user-facing error, we should actually leverage errorframe work we have .. cc @gengliangwang @MaxGekk @itholic 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995398147


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"

Review Comment:
   I believe we don't need this because the tests won't run in parallel, and `withTable` will drop the table.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"

Review Comment:
   This one too. I believe we don't need a random path now. We could just inline any path into the test



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -34,59 +36,109 @@ package object dsl {
       val identifier = CatalystSqlParser.parseMultipartIdentifier(s)
 
       def protoAttr: proto.Expression =
-        proto.Expression.newBuilder()
+        proto.Expression
+          .newBuilder()
           .setUnresolvedAttribute(
-            proto.Expression.UnresolvedAttribute.newBuilder()
+            proto.Expression.UnresolvedAttribute
+              .newBuilder()
               .addAllParts(identifier.asJava)
               .build())
           .build()
     }
 
     implicit class DslExpression(val expr: proto.Expression) {
-      def as(alias: String): proto.Expression = proto.Expression.newBuilder().setAlias(
-        proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr)).build()
-
-      def < (other: proto.Expression): proto.Expression =
-        proto.Expression.newBuilder().setUnresolvedFunction(
-          proto.Expression.UnresolvedFunction.newBuilder()
-            .addParts("<")
-            .addArguments(expr)
-            .addArguments(other)
-        ).build()
+      def as(alias: String): proto.Expression = proto.Expression
+        .newBuilder()
+        .setAlias(proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr))
+        .build()
+
+      def <(other: proto.Expression): proto.Expression =
+        proto.Expression
+          .newBuilder()
+          .setUnresolvedFunction(
+            proto.Expression.UnresolvedFunction
+              .newBuilder()
+              .addParts("<")
+              .addArguments(expr)
+              .addArguments(other))
+          .build()
     }
 
     implicit def intToLiteral(i: Int): proto.Expression =
-      proto.Expression.newBuilder().setLiteral(
-        proto.Expression.Literal.newBuilder().setI32(i)
-      ).build()
+      proto.Expression
+        .newBuilder()
+        .setLiteral(proto.Expression.Literal.newBuilder().setI32(i))
+        .build()
+  }
+
+  object commands { // scalastyle:ignore
+    implicit class DslCommands(val logicalPlan: proto.Relation) {
+      def write(
+          format: Option[String] = None,
+          path: Option[String] = None,
+          tableName: Option[String] = None,
+          mode: Option[String] = None,
+          sortByColumns: Seq[String] = Seq.empty,
+          partitionByCols: Seq[String] = Seq.empty,
+          bucketByCols: Seq[String] = Seq.empty,
+          numBuckets: Option[Int] = None): proto.Command = {
+        val writeOp = proto.WriteOperation.newBuilder()
+        format.foreach(writeOp.setSource(_))
+
+        mode
+          .map(SaveMode.valueOf(_))
+          .map(DataTypeProtoConverter.toSaveModeProto(_))
+          .foreach(writeOp.setMode(_))
+
+        if (tableName.nonEmpty) {
+          tableName.foreach(writeOp.setTableName(_))
+        } else {
+          path.foreach(writeOp.setPath(_))
+        }
+        sortByColumns.foreach(writeOp.addSortColumnNames(_))
+        partitionByCols.foreach(writeOp.addPartitioningColumns(_))
+
+        if (numBuckets.nonEmpty && bucketByCols.nonEmpty) {
+          val op = proto.WriteOperation.BucketBy.newBuilder()
+          numBuckets.foreach(op.setNumBuckets(_))
+          bucketByCols.foreach(op.addBucketColumnNames(_))
+          writeOp.setBucketBy(op.build())
+        }
+        writeOp.setInput(logicalPlan)
+        proto.Command.newBuilder().setWriteOperation(writeOp.build()).build()
+      }
+    }
   }
 
   object plans { // scalastyle:ignore
     implicit class DslLogicalPlan(val logicalPlan: proto.Relation) {
       def select(exprs: proto.Expression*): proto.Relation = {
-        proto.Relation.newBuilder().setProject(

Review Comment:
   I would remove these style changes since the previous codes already follow the style guides we have.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:
##########
@@ -88,16 +108,20 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
   }
 
   test("Simple Project") {
-    val readWithTable = proto.Read.newBuilder()
+    val readWithTable = proto.Read
+      .newBuilder()
       .setNamedTable(proto.Read.NamedTable.newBuilder.addParts("name").build())
       .build()
     val project =
-      proto.Project.newBuilder()
+      proto.Project
+        .newBuilder()
         .setInput(proto.Relation.newBuilder().setRead(readWithTable).build())
         .addExpressions(
-          proto.Expression.newBuilder()
-            .setUnresolvedStar(UnresolvedStar.newBuilder().build()).build()
-        ).build()
+          proto.Expression
+            .newBuilder()
+            .setUnresolvedStar(UnresolvedStar.newBuilder().build())
+            .build())
+        .build()

Review Comment:
   Maybe we should get rid of these changes too?



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  def transform(cmd: proto.Command): Unit = {
+    new SparkConnectCommandPlanner(spark, cmd).process()
+  }
+
+  test("Writes fails without path or table") {
+    assertThrows[UnsupportedOperationException] {
+      transform(localRelation.write())
+    }
+  }
+
+  test("Write fails with unknown table - AnalysisException") {
+    val cmd = readRel.write(tableName = Some("dest"))
+    assertThrows[AnalysisException] {
+      transform(cmd)
+    }
+  }
+
+  test("Write with partitions") {
+    val name = table()

Review Comment:
   maybe `withTable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r992201281


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -24,10 +24,16 @@ import com.google.common.collect.{Lists, Maps}
 import org.apache.spark.annotation.{Since, Unstable}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.connect.proto.WriteOperation
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
+final case class InvalidCommandInput(
+    private val message: String = "",
+    private val cause: Throwable = None.orNull)

Review Comment:
   use `null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38192:
URL: https://github.com/apache/spark/pull/38192#issuecomment-1275403753

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995369875


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.{SparkClassNotFoundException, SparkFunSuite}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SparkConnectCommandPlannerSuite
+    extends SparkFunSuite
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  /**
+   * Helper method that takes a closure as an argument to handle cleanup of the resource created.
+   * @param thunk
+   */
+  def withTable(thunk: String => Any): Unit = {
+    val name = table()
+    thunk(name)
+    spark.sql(s"drop table if exists ${name}")
+  }
+
+  /**
+   * Helper method that takes a closure as an arugment and handles cleanup of the file system
+   * resource created.
+   * @param thunk
+   */
+  def withPath(thunk: String => Any): Unit = {
+    val name = path()
+    thunk(name)
+    FileUtils.deleteDirectory(new File(name))
+  }
+
+  def transform(cmd: proto.Command): Unit = {
+    new SparkConnectCommandPlanner(spark, cmd).process()
+  }
+
+  test("Writes fails without path or table") {
+    assertThrows[UnsupportedOperationException] {
+      transform(localRelation.write())
+    }
+  }
+
+  test("Write fails with unknown table - AnalysisException") {
+    val cmd = readRel.write(tableName = Some("dest"))
+    assertThrows[AnalysisException] {
+      transform(cmd)
+    }
+  }
+
+  test("Write with partitions") {
+    val name = table()
+    val cmd = localRelation.write(
+      tableName = Some(name),
+      format = Some("parquet"),
+      partitionByCols = Seq("noid"))
+    assertThrows[AnalysisException] {
+      transform(cmd)
+    }
+  }
+
+  test("Write with invalid bucketBy configuration") {
+    val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = Some(0))
+    assertThrows[InvalidCommandInput] {
+      transform(cmd)
+    }
+  }
+
+  test("Write to Path") {
+    withPath { name =>
+      val cmd = localRelation.write(format = Some("parquet"), path = Some(name))
+      transform(cmd)
+      assert(Files.exists(Paths.get(name)), s"Output file must exist: ${name}")
+    }
+  }
+
+  test("Write to Path with invalid input") {
+    // Wrong data source.
+    assertThrows[SparkClassNotFoundException](
+      transform(localRelation.write(path = Some(path), format = Some("ThisAintNoFormat"))))
+
+    // Default data source not found.
+    assertThrows[SparkClassNotFoundException](transform(localRelation.write(path = Some(path))))
+  }
+
+  test("Write with sortBy") {
+    // Sort by existing column.
+    transform(
+      localRelation.write(
+        tableName = Some(table),
+        format = Some("parquet"),
+        sortByColumns = Seq("id"),
+        bucketByCols = Seq("id"),
+        numBuckets = Some(10)))
+
+    // Sort by non-existing column
+    assertThrows[AnalysisException](
+      transform(
+        localRelation
+          .write(
+            tableName = Some(table),
+            format = Some("parquet"),
+            sortByColumns = Seq("noid"),
+            bucketByCols = Seq("id"),
+            numBuckets = Some(10))))
+  }
+
+  test("Write to Table") {
+    withTable { name =>
+      val cmd = localRelation.write(format = Some("parquet"), tableName = Some(name))
+      transform(cmd)
+      // Check that we can find and drop the table.
+      spark.sql(s"select count(*) from ${name}").collect()

Review Comment:
   The table is empty and the assumption is that the write operation was successful if the table can be found, because the write is already implemented and tested.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.{SparkClassNotFoundException, SparkFunSuite}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SparkConnectCommandPlannerSuite
+    extends SparkFunSuite
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  /**
+   * Helper method that takes a closure as an argument to handle cleanup of the resource created.
+   * @param thunk
+   */
+  def withTable(thunk: String => Any): Unit = {

Review Comment:
   Fixing that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995420905


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"

Review Comment:
   removing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38192: [CONNECT] [SPARK-40737] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r991558435


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +77,49 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to
+   * the corresponding logical plan. Afterwards, creates the DataFrameWriter and translates
+   * the parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {

Review Comment:
   It is a bit weird to have this in the SparkPlanner node, but I guess this is the consequence of the builder() API we have in the DataFrameWriter.
   
   @cloud-fan AFAIK you have been working on making writes more declarative (i.e. planned writes). Do you see a way to improve this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r992686049


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,29 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  Relation input = 1;
+  string format = 2;
+
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  string mode = 5;
+  repeated string sortColumnNames = 6;
+  repeated string partitionByColumns = 7;
+  BucketBy bucketBy = 8;
+  repeated WriteOptions options = 9;

Review Comment:
   So the interesting question is, options at least in the Scala source supports different types. It should probably be a `map<string, expressions.Literal>` but I was too lazy. I can change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995357975


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -40,17 +46,19 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     command.getCommandTypeCase match {
       case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
         handleCreateScalarFunction(command.getCreateFunction)
+      case proto.Command.CommandTypeCase.WRITE_OPERATION =>
+        handleWriteOperation(command.getWriteOperation)
       case _ => throw new UnsupportedOperationException(s"$command not supported.")
     }
   }
 
   /**
    * This is a helper function that registers a new Python function in the SparkSession.
    *
-   * Right now this function is very rudimentary and bare-bones just to showcase how it
-   * is possible to remotely serialize a Python function and execute it on the Spark cluster.
-   * If the Python version on the client and server diverge, the execution of the function that
-   * is serialized will most likely fail.
+   * Right now this function is very rudimentary and bare-bones just to showcase how it is
+   * possible to remotely serialize a Python function and execute it on the Spark cluster. If the
+   * Python version on the client and server diverge, the execution of the function that is
+   * serialized will most likely fail.

Review Comment:
   I think we should better exclude these changes - I suspect they are from scalafmt (?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995360999


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +77,49 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to
+   * the corresponding logical plan. Afterwards, creates the DataFrameWriter and translates
+   * the parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {

Review Comment:
   cc @allisonwang-db FYI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [CONNECT] [SPARK-40737] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r991647838


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -23,7 +23,13 @@ import org.apache.spark.annotation.{Since, Unstable}
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, plans}
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
+import org.apache.spark.sql.catalyst.analysis.{

Review Comment:
   fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995364802


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string format = 2;
+  // The destination of the write operation must be either a path or a table.
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  SaveMode mode = 5;
+  // List of columns to sort the output by.
+  repeated string sortColumnNames = 6;
+  // List of columns for partitioning.
+  repeated string partitionByColumns = 7;

Review Comment:
   no biggie but I would match the name as what DataFrameWriter has. e.g.) `partitioningColumns`, `bucketColumnNames`, `sortColumnNames`, `numBuckets` and `extraOptions`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995357356


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -34,59 +36,109 @@ package object dsl {
       val identifier = CatalystSqlParser.parseMultipartIdentifier(s)
 
       def protoAttr: proto.Expression =
-        proto.Expression.newBuilder()
+        proto.Expression
+          .newBuilder()
           .setUnresolvedAttribute(
-            proto.Expression.UnresolvedAttribute.newBuilder()
+            proto.Expression.UnresolvedAttribute
+              .newBuilder()
               .addAllParts(identifier.asJava)
               .build())
           .build()
     }
 
     implicit class DslExpression(val expr: proto.Expression) {
-      def as(alias: String): proto.Expression = proto.Expression.newBuilder().setAlias(
-        proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr)).build()
-
-      def < (other: proto.Expression): proto.Expression =
-        proto.Expression.newBuilder().setUnresolvedFunction(
-          proto.Expression.UnresolvedFunction.newBuilder()
-            .addParts("<")
-            .addArguments(expr)
-            .addArguments(other)
-        ).build()
+      def as(alias: String): proto.Expression = proto.Expression

Review Comment:
   qq is this just a style change? Maybe it's better to exclude these unrelated changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38192:
URL: https://github.com/apache/spark/pull/38192#issuecomment-1276995603

   @hvanhovell @cloud-fan @HyukjinKwon can you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell closed pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter
URL: https://github.com/apache/spark/pull/38192


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r997113086


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +77,49 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to
+   * the corresponding logical plan. Afterwards, creates the DataFrameWriter and translates
+   * the parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {

Review Comment:
   This is more than planned write. We need to create a logical plan for DF write, instead of putting implementation code in DF write APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995364802


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string format = 2;
+  // The destination of the write operation must be either a path or a table.
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  SaveMode mode = 5;
+  // List of columns to sort the output by.
+  repeated string sortColumnNames = 6;
+  // List of columns for partitioning.
+  repeated string partitionByColumns = 7;

Review Comment:
   no biggie but I would match the name as what DataFrameWriter has. e.g.) `partitioningColumns`, `bucketColumnNames`, `sortColumnNames` and `extraOptions`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995361583


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.{SparkClassNotFoundException, SparkFunSuite}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SparkConnectCommandPlannerSuite
+    extends SparkFunSuite
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  /**
+   * Helper method that takes a closure as an argument to handle cleanup of the resource created.
+   * @param thunk
+   */
+  def withTable(thunk: String => Any): Unit = {

Review Comment:
   I think we can leverage `withTempPath` at `SQLHelper`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995411321


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:
##########
@@ -88,16 +108,20 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
   }
 
   test("Simple Project") {
-    val readWithTable = proto.Read.newBuilder()
+    val readWithTable = proto.Read
+      .newBuilder()
       .setNamedTable(proto.Read.NamedTable.newBuilder.addParts("name").build())
       .build()
     val project =
-      proto.Project.newBuilder()
+      proto.Project
+        .newBuilder()
         .setInput(proto.Relation.newBuilder().setRead(readWithTable).build())
         .addExpressions(
-          proto.Expression.newBuilder()
-            .setUnresolvedStar(UnresolvedStar.newBuilder().build()).build()
-        ).build()
+          proto.Expression
+            .newBuilder()
+            .setUnresolvedStar(UnresolvedStar.newBuilder().build())
+            .build())
+        .build()

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r993754109


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +82,64 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to the
+   * corresponding logical plan. Afterwards, creates the DataFrameWriter and translates the
+   * parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+    // Transform the input plan into the logical plan.
+    val planner = new SparkConnectPlanner(writeOperation.getInput, session)
+    val plan = planner.transform()
+    // And create a Dataset from the plan.
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val w = dataset.write
+    if (writeOperation.getMode != proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED) {
+      w.mode(DataTypeProtoConverter.toSaveMode(writeOperation.getMode))
+    }
+
+    if (writeOperation.getOptionsCount > 0) {
+      writeOperation.getOptionsMap.asScala.foreach { case (key, value) => w.option(key, value) }
+    }
+
+    if (writeOperation.getSortColumnNamesCount > 0) {
+      val names = writeOperation.getSortColumnNamesList.asScala
+      w.sortBy(names.head, names.tail.toSeq: _*)
+    }
+
+    if (writeOperation.hasBucketBy) {
+      val op = writeOperation.getBucketBy
+      val cols = op.getColumnsList.asScala
+      if (op.getBucketCount <= 0) {
+        throw InvalidCommandInput(
+          s"BucketBy must specify a bucket count > 0, received ${op.getBucketCount} instead.")
+      }
+      w.bucketBy(op.getBucketCount, cols.head, cols.tail.toSeq: _*)
+    }
+
+    if (writeOperation.getPartitionByColumnsCount > 0) {
+      val names = writeOperation.getPartitionByColumnsList.asScala
+      w.partitionBy(names.toSeq: _*)
+    }
+
+    if (writeOperation.getFormat != null) {
+      w.format(writeOperation.getFormat)
+    }
+
+    writeOperation.getSaveTypeCase match {
+      case proto.WriteOperation.SaveTypeCase.PATH => w.save(writeOperation.getPath)
+      case proto.WriteOperation.SaveTypeCase.TABLE_NAME =>
+        w.saveAsTable(writeOperation.getTableName)
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"WriteOperation:SaveTypeCase not supported "
+            + "${writeOperation.getSaveTypeCase.getNumber}")

Review Comment:
   Number? Can we try to show the name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r993783613


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +82,64 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to the
+   * corresponding logical plan. Afterwards, creates the DataFrameWriter and translates the
+   * parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+    // Transform the input plan into the logical plan.
+    val planner = new SparkConnectPlanner(writeOperation.getInput, session)
+    val plan = planner.transform()
+    // And create a Dataset from the plan.
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val w = dataset.write
+    if (writeOperation.getMode != proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED) {
+      w.mode(DataTypeProtoConverter.toSaveMode(writeOperation.getMode))
+    }
+
+    if (writeOperation.getOptionsCount > 0) {
+      writeOperation.getOptionsMap.asScala.foreach { case (key, value) => w.option(key, value) }
+    }
+
+    if (writeOperation.getSortColumnNamesCount > 0) {
+      val names = writeOperation.getSortColumnNamesList.asScala
+      w.sortBy(names.head, names.tail.toSeq: _*)
+    }
+
+    if (writeOperation.hasBucketBy) {
+      val op = writeOperation.getBucketBy
+      val cols = op.getColumnsList.asScala
+      if (op.getBucketCount <= 0) {
+        throw InvalidCommandInput(
+          s"BucketBy must specify a bucket count > 0, received ${op.getBucketCount} instead.")
+      }
+      w.bucketBy(op.getBucketCount, cols.head, cols.tail.toSeq: _*)
+    }
+
+    if (writeOperation.getPartitionByColumnsCount > 0) {
+      val names = writeOperation.getPartitionByColumnsList.asScala
+      w.partitionBy(names.toSeq: _*)
+    }
+
+    if (writeOperation.getFormat != null) {
+      w.format(writeOperation.getFormat)
+    }
+
+    writeOperation.getSaveTypeCase match {
+      case proto.WriteOperation.SaveTypeCase.PATH => w.save(writeOperation.getPath)
+      case proto.WriteOperation.SaveTypeCase.TABLE_NAME =>
+        w.saveAsTable(writeOperation.getTableName)
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"WriteOperation:SaveTypeCase not supported "
+            + "${writeOperation.getSaveTypeCase.getNumber}")

Review Comment:
   In case the proto does not have the enum value there is no name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38192:
URL: https://github.com/apache/spark/pull/38192#issuecomment-1274526832

   @cloud-fan @amaliujia @hvanhovell please take a look!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r993687765


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,29 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  Relation input = 1;
+  string format = 2;
+
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  string mode = 5;
+  repeated string sortColumnNames = 6;
+  repeated string partitionByColumns = 7;
+  BucketBy bucketBy = 8;
+  repeated WriteOptions options = 9;

Review Comment:
   I mapped this to `map<string, string>` because behind this scenes it's converted to a string anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r997102043


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string source = 2;
+  // The destination of the write operation must be either a path or a table.

Review Comment:
   in DF API, people can do `df.write.format("jdbc").option("table", ...).save()` , so the destination is neither path nor table. I think an optional table name is sufficient. If table name is not given, the destination will be figured out from write options (path is just one write option).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38192: [CONNECT] [SPARK-40737] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r991556451


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,29 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.

Review Comment:
   We want this to be eagerly executed, so - IMO - making this a command is your only option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995363641


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.{SparkClassNotFoundException, SparkFunSuite}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SparkConnectCommandPlannerSuite
+    extends SparkFunSuite
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  /**
+   * Helper method that takes a closure as an argument to handle cleanup of the resource created.
+   * @param thunk
+   */
+  def withTable(thunk: String => Any): Unit = {
+    val name = table()
+    thunk(name)
+    spark.sql(s"drop table if exists ${name}")
+  }
+
+  /**
+   * Helper method that takes a closure as an arugment and handles cleanup of the file system
+   * resource created.
+   * @param thunk
+   */
+  def withPath(thunk: String => Any): Unit = {
+    val name = path()
+    thunk(name)
+    FileUtils.deleteDirectory(new File(name))
+  }
+
+  def transform(cmd: proto.Command): Unit = {
+    new SparkConnectCommandPlanner(spark, cmd).process()
+  }
+
+  test("Writes fails without path or table") {
+    assertThrows[UnsupportedOperationException] {
+      transform(localRelation.write())
+    }
+  }
+
+  test("Write fails with unknown table - AnalysisException") {
+    val cmd = readRel.write(tableName = Some("dest"))
+    assertThrows[AnalysisException] {
+      transform(cmd)
+    }
+  }
+
+  test("Write with partitions") {
+    val name = table()
+    val cmd = localRelation.write(
+      tableName = Some(name),
+      format = Some("parquet"),
+      partitionByCols = Seq("noid"))
+    assertThrows[AnalysisException] {
+      transform(cmd)
+    }
+  }
+
+  test("Write with invalid bucketBy configuration") {
+    val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = Some(0))
+    assertThrows[InvalidCommandInput] {
+      transform(cmd)
+    }
+  }
+
+  test("Write to Path") {
+    withPath { name =>
+      val cmd = localRelation.write(format = Some("parquet"), path = Some(name))
+      transform(cmd)
+      assert(Files.exists(Paths.get(name)), s"Output file must exist: ${name}")
+    }
+  }
+
+  test("Write to Path with invalid input") {
+    // Wrong data source.
+    assertThrows[SparkClassNotFoundException](
+      transform(localRelation.write(path = Some(path), format = Some("ThisAintNoFormat"))))
+
+    // Default data source not found.
+    assertThrows[SparkClassNotFoundException](transform(localRelation.write(path = Some(path))))
+  }
+
+  test("Write with sortBy") {
+    // Sort by existing column.
+    transform(
+      localRelation.write(
+        tableName = Some(table),
+        format = Some("parquet"),
+        sortByColumns = Seq("id"),
+        bucketByCols = Seq("id"),
+        numBuckets = Some(10)))
+
+    // Sort by non-existing column
+    assertThrows[AnalysisException](
+      transform(
+        localRelation
+          .write(
+            tableName = Some(table),
+            format = Some("parquet"),
+            sortByColumns = Seq("noid"),
+            bucketByCols = Seq("id"),
+            numBuckets = Some(10))))
+  }
+
+  test("Write to Table") {
+    withTable { name =>
+      val cmd = localRelation.write(format = Some("parquet"), tableName = Some(name))
+      transform(cmd)
+      // Check that we can find and drop the table.
+      spark.sql(s"select count(*) from ${name}").collect()

Review Comment:
   Should we compare the results? Could leverage `checkAnswer` at `QueryTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995393706


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -40,17 +46,19 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     command.getCommandTypeCase match {
       case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
         handleCreateScalarFunction(command.getCreateFunction)
+      case proto.Command.CommandTypeCase.WRITE_OPERATION =>
+        handleWriteOperation(command.getWriteOperation)
       case _ => throw new UnsupportedOperationException(s"$command not supported.")
     }
   }
 
   /**
    * This is a helper function that registers a new Python function in the SparkSession.
    *
-   * Right now this function is very rudimentary and bare-bones just to showcase how it
-   * is possible to remotely serialize a Python function and execute it on the Spark cluster.
-   * If the Python version on the client and server diverge, the execution of the function that
-   * is serialized will most likely fail.
+   * Right now this function is very rudimentary and bare-bones just to showcase how it is
+   * possible to remotely serialize a Python function and execute it on the Spark cluster. If the
+   * Python version on the client and server diverge, the execution of the function that is
+   * serialized will most likely fail.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995356725


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +82,64 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to the
+   * corresponding logical plan. Afterwards, creates the DataFrameWriter and translates the
+   * parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+    // Transform the input plan into the logical plan.
+    val planner = new SparkConnectPlanner(writeOperation.getInput, session)
+    val plan = planner.transform()
+    // And create a Dataset from the plan.
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val w = dataset.write
+    if (writeOperation.getMode != proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED) {
+      w.mode(DataTypeProtoConverter.toSaveMode(writeOperation.getMode))
+    }
+
+    if (writeOperation.getOptionsCount > 0) {
+      writeOperation.getOptionsMap.asScala.foreach { case (key, value) => w.option(key, value) }
+    }
+
+    if (writeOperation.getSortColumnNamesCount > 0) {
+      val names = writeOperation.getSortColumnNamesList.asScala
+      w.sortBy(names.head, names.tail.toSeq: _*)
+    }
+
+    if (writeOperation.hasBucketBy) {
+      val op = writeOperation.getBucketBy
+      val cols = op.getColumnsList.asScala
+      if (op.getBucketCount <= 0) {
+        throw InvalidCommandInput(
+          s"BucketBy must specify a bucket count > 0, received ${op.getBucketCount} instead.")
+      }
+      w.bucketBy(op.getBucketCount, cols.head, cols.tail.toSeq: _*)
+    }
+
+    if (writeOperation.getPartitionByColumnsCount > 0) {
+      val names = writeOperation.getPartitionByColumnsList.asScala
+      w.partitionBy(names.toSeq: _*)
+    }
+
+    if (writeOperation.getFormat != null) {
+      w.format(writeOperation.getFormat)
+    }
+
+    writeOperation.getSaveTypeCase match {
+      case proto.WriteOperation.SaveTypeCase.PATH => w.save(writeOperation.getPath)
+      case proto.WriteOperation.SaveTypeCase.TABLE_NAME =>
+        w.saveAsTable(writeOperation.getTableName)
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"WriteOperation:SaveTypeCase not supported "
+            + "${writeOperation.getSaveTypeCase.getNumber}")

Review Comment:
   ```suggestion
             "WriteOperation:SaveTypeCase not supported "
               + s"${writeOperation.getSaveTypeCase.getNumber}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r992640334


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +82,60 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  /**
+   * Transforms the write operation and executes it.
+   *
+   * The input write operation contains a reference to the input plan and transforms it to the
+   * corresponding logical plan. Afterwards, creates the DataFrameWriter and translates the
+   * parameters of the WriteOperation into the corresponding methods calls.
+   *
+   * @param writeOperation
+   */
+  def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+    // Transform the input plan into the logical plan.
+    val planner = new SparkConnectPlanner(writeOperation.getInput, session)
+    val plan = planner.transform()
+    // And create a Dataset from the plan.
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val w = dataset.write
+    if (writeOperation.getOptionsCount > 0) {
+      writeOperation.getOptionsList.asScala.foreach(x => w.option(x.getKey, x.getValue))
+    }
+
+    if (writeOperation.getSortColumnNamesCount > 0) {
+      val names = writeOperation.getSortColumnNamesList.asScala
+      w.sortBy(names.head, names.tail.toSeq: _*)
+    }
+
+    if (writeOperation.hasBucketBy) {
+      val op = writeOperation.getBucketBy
+      val cols = op.getColumnsList.asScala
+      if (op.getBucketCount <= 0) {
+        throw InvalidCommandInput(
+          s"BucketBy must specify a bucket count > 0, received ${op.getBucketCount} instead.")
+      }
+      w.bucketBy(op.getBucketCount, cols.head, cols.tail.toSeq: _*)
+    }
+
+    if (writeOperation.getPartitionByColumnsCount > 0) {
+      val names = writeOperation.getPartitionByColumnsList.asScala
+      w.partitionBy(names.toSeq: _*)
+    }
+
+    if (writeOperation.getFormat != null) {

Review Comment:
   yeah I found for `string` there was no `hasXXX` call thus we probably need this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995374758


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string format = 2;
+  // The destination of the write operation must be either a path or a table.
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  SaveMode mode = 5;
+  // List of columns to sort the output by.
+  repeated string sortColumnNames = 6;
+  // List of columns for partitioning.
+  repeated string partitionByColumns = 7;

Review Comment:
   I changed this accordingly with a minor exception on the bucketing where I wrap this into the `BucketBy` message to have a better ux for setting the two values together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995421139


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -34,59 +36,109 @@ package object dsl {
       val identifier = CatalystSqlParser.parseMultipartIdentifier(s)
 
       def protoAttr: proto.Expression =
-        proto.Expression.newBuilder()
+        proto.Expression
+          .newBuilder()
           .setUnresolvedAttribute(
-            proto.Expression.UnresolvedAttribute.newBuilder()
+            proto.Expression.UnresolvedAttribute
+              .newBuilder()
               .addAllParts(identifier.asJava)
               .build())
           .build()
     }
 
     implicit class DslExpression(val expr: proto.Expression) {
-      def as(alias: String): proto.Expression = proto.Expression.newBuilder().setAlias(
-        proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr)).build()
-
-      def < (other: proto.Expression): proto.Expression =
-        proto.Expression.newBuilder().setUnresolvedFunction(
-          proto.Expression.UnresolvedFunction.newBuilder()
-            .addParts("<")
-            .addArguments(expr)
-            .addArguments(other)
-        ).build()
+      def as(alias: String): proto.Expression = proto.Expression
+        .newBuilder()
+        .setAlias(proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr))
+        .build()
+
+      def <(other: proto.Expression): proto.Expression =
+        proto.Expression
+          .newBuilder()
+          .setUnresolvedFunction(
+            proto.Expression.UnresolvedFunction
+              .newBuilder()
+              .addParts("<")
+              .addArguments(expr)
+              .addArguments(other))
+          .build()
     }
 
     implicit def intToLiteral(i: Int): proto.Expression =
-      proto.Expression.newBuilder().setLiteral(
-        proto.Expression.Literal.newBuilder().setI32(i)
-      ).build()
+      proto.Expression
+        .newBuilder()
+        .setLiteral(proto.Expression.Literal.newBuilder().setI32(i))
+        .build()
+  }
+
+  object commands { // scalastyle:ignore
+    implicit class DslCommands(val logicalPlan: proto.Relation) {
+      def write(
+          format: Option[String] = None,
+          path: Option[String] = None,
+          tableName: Option[String] = None,
+          mode: Option[String] = None,
+          sortByColumns: Seq[String] = Seq.empty,
+          partitionByCols: Seq[String] = Seq.empty,
+          bucketByCols: Seq[String] = Seq.empty,
+          numBuckets: Option[Int] = None): proto.Command = {
+        val writeOp = proto.WriteOperation.newBuilder()
+        format.foreach(writeOp.setSource(_))
+
+        mode
+          .map(SaveMode.valueOf(_))
+          .map(DataTypeProtoConverter.toSaveModeProto(_))
+          .foreach(writeOp.setMode(_))
+
+        if (tableName.nonEmpty) {
+          tableName.foreach(writeOp.setTableName(_))
+        } else {
+          path.foreach(writeOp.setPath(_))
+        }
+        sortByColumns.foreach(writeOp.addSortColumnNames(_))
+        partitionByCols.foreach(writeOp.addPartitioningColumns(_))
+
+        if (numBuckets.nonEmpty && bucketByCols.nonEmpty) {
+          val op = proto.WriteOperation.BucketBy.newBuilder()
+          numBuckets.foreach(op.setNumBuckets(_))
+          bucketByCols.foreach(op.addBucketColumnNames(_))
+          writeOp.setBucketBy(op.build())
+        }
+        writeOp.setInput(logicalPlan)
+        proto.Command.newBuilder().setWriteOperation(writeOp.build()).build()
+      }
+    }
   }
 
   object plans { // scalastyle:ignore
     implicit class DslLogicalPlan(val logicalPlan: proto.Relation) {
       def select(exprs: proto.Expression*): proto.Relation = {
-        proto.Relation.newBuilder().setProject(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995361997


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.{SparkClassNotFoundException, SparkFunSuite}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SparkConnectCommandPlannerSuite
+    extends SparkFunSuite
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  /**
+   * Helper method that takes a closure as an argument to handle cleanup of the resource created.
+   * @param thunk
+   */
+  def withTable(thunk: String => Any): Unit = {

Review Comment:
   and `withTable` at `SQLTestUtils`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995411114


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  def transform(cmd: proto.Command): Unit = {
+    new SparkConnectCommandPlanner(spark, cmd).process()
+  }
+
+  test("Writes fails without path or table") {
+    assertThrows[UnsupportedOperationException] {
+      transform(localRelation.write())
+    }
+  }
+
+  test("Write fails with unknown table - AnalysisException") {
+    val cmd = readRel.write(tableName = Some("dest"))
+    assertThrows[AnalysisException] {
+      transform(cmd)
+    }
+  }
+
+  test("Write with partitions") {
+    val name = table()

Review Comment:
   I'll refactor the cases so that when we use names and errors it doesnt matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995412032


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = s"table${UUID.randomUUID().toString.replace("-", "")}"

Review Comment:
   Do we not run any test in parallel? Is this guaranteed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r993806060


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -24,10 +24,16 @@ import com.google.common.collect.{Lists, Maps}
 import org.apache.spark.annotation.{Since, Unstable}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.connect.proto.WriteOperation
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
+final case class InvalidCommandInput(

Review Comment:
   I wanted to have a custom exception for when we rethrow.



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -24,10 +24,16 @@ import com.google.common.collect.{Lists, Maps}
 import org.apache.spark.annotation.{Since, Unstable}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.connect.proto.WriteOperation
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
+final case class InvalidCommandInput(
+    private val message: String = "",
+    private val cause: Throwable = None.orNull)

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995348121


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -24,10 +24,16 @@ import com.google.common.collect.{Lists, Maps}
 import org.apache.spark.annotation.{Since, Unstable}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.connect.proto.WriteOperation
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
+final case class InvalidCommandInput(

Review Comment:
   I'm happy to fix this as a follow up, does it make sense?
   
   The errors are reported back through grpc. If you point me to the right base class I can fix it then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995414508


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r992647666


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,29 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  Relation input = 1;
+  string format = 2;
+
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  string mode = 5;
+  repeated string sortColumnNames = 6;
+  repeated string partitionByColumns = 7;
+  BucketBy bucketBy = 8;
+  repeated WriteOptions options = 9;

Review Comment:
   Why just not a map<string, string>? Does proto map have a iterator (if so in later use it is the same as `repeated`) 



##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,29 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  Relation input = 1;
+  string format = 2;
+
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  string mode = 5;
+  repeated string sortColumnNames = 6;
+  repeated string partitionByColumns = 7;
+  BucketBy bucketBy = 8;
+  repeated WriteOptions options = 9;

Review Comment:
   Why not just a map<string, string>? Does proto map have a iterator (if so in later use it is the same as `repeated`) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r997110469


##########
connector/connect/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -62,3 +65,39 @@ message CreateScalarFunction {
     FUNCTION_LANGUAGE_SCALA = 3;
   }
 }
+
+// As writes are not directly handled during analysis and planning, they are modeled as commands.
+message WriteOperation {
+  // The output of the `input` relation will be persisted according to the options.
+  Relation input = 1;
+  // Format value according to the Spark documentation. Examples are: text, parquet, delta.
+  string source = 2;
+  // The destination of the write operation must be either a path or a table.
+  oneof save_type {
+    string path = 3;
+    string table_name = 4;
+  }
+  SaveMode mode = 5;
+  // List of columns to sort the output by.
+  repeated string sort_column_names = 6;

Review Comment:
   This should be part of the BucketBy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #38192: [SPARK-40737][CONNECT] Add basic support for DataFrameWriter

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on PR #38192:
URL: https://github.com/apache/spark/pull/38192#issuecomment-1279789307

   Merging this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org