You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/10 05:02:59 UTC

[spark] branch master updated: [SPARK-41046][CONNECT] Support CreateView in Connect DSL

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f5101319c1a [SPARK-41046][CONNECT] Support CreateView in Connect DSL
f5101319c1a is described below

commit f5101319c1a83a754d899e10a367356af069ca66
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Nov 10 13:02:45 2022 +0800

    [SPARK-41046][CONNECT] Support CreateView in Connect DSL
    
    ### What changes were proposed in this pull request?
    
    This PR supports creating global temp view or local temp view in Connect DSL.
    
    In proto, it is modeled as a command which will be executed immediately on the server side.
    
    ### Why are the changes needed?
    
    Improve API coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #38566 from amaliujia/create_view_api.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/protobuf/spark/connect/commands.proto | 19 ++++++++
 .../command/SparkConnectCommandPlanner.scala       | 32 +++++++++++++
 .../org/apache/spark/sql/connect/dsl/package.scala | 13 ++++++
 .../planner/SparkConnectCommandPlannerSuite.scala  | 19 ++++++++
 python/pyspark/sql/connect/proto/commands_pb2.py   | 30 ++++++------
 python/pyspark/sql/connect/proto/commands_pb2.pyi  | 54 +++++++++++++++++++++-
 6 files changed, 152 insertions(+), 15 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/commands.proto b/connector/connect/src/main/protobuf/spark/connect/commands.proto
index 79c6cffdf60..086d4d0cc92 100644
--- a/connector/connect/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/commands.proto
@@ -31,6 +31,7 @@ message Command {
   oneof command_type {
     CreateScalarFunction create_function = 1;
     WriteOperation write_operation = 2;
+    CreateDataFrameViewCommand create_dataframe_view = 3;
   }
 }
 
@@ -65,6 +66,24 @@ message CreateScalarFunction {
   }
 }
 
+// A command that can create DataFrame global temp view or local temp view.
+message CreateDataFrameViewCommand {
+  // Required. The relation that this view will be built on.
+  Relation input = 1;
+
+  // Required. View name.
+  string name = 2;
+
+  // Required. Whether this is global temp view or local temp view.
+  bool is_global = 3;
+
+  // Required.
+  //
+  // If true, and if the view already exists, updates it; if false, and if the view
+  // already exists, throws exception.
+  bool replace = 4;
+}
+
 // As writes are not directly handled during analysis and planning, they are modeled as commands.
 message WriteOperation {
   // The output of the `input` relation will be persisted according to the options.
diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
index 80c36a4773e..11090976c7f 100644
--- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
+++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
@@ -25,7 +25,11 @@ import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.WriteOperation
 import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView}
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, SparkConnectPlanner}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.command.CreateViewCommand
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
@@ -45,6 +49,8 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
         handleCreateScalarFunction(command.getCreateFunction)
       case proto.Command.CommandTypeCase.WRITE_OPERATION =>
         handleWriteOperation(command.getWriteOperation)
+      case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW =>
+        handleCreateViewCommand(command.getCreateDataframeView)
       case _ => throw new UnsupportedOperationException(s"$command not supported.")
     }
   }
@@ -79,6 +85,32 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  def handleCreateViewCommand(createView: proto.CreateDataFrameViewCommand): Unit = {
+    val viewType = if (createView.getIsGlobal) GlobalTempView else LocalTempView
+
+    val tableIdentifier =
+      try {
+        session.sessionState.sqlParser.parseTableIdentifier(createView.getName)
+      } catch {
+        case _: ParseException =>
+          throw QueryCompilationErrors.invalidViewNameError(createView.getName)
+      }
+
+    val plan = CreateViewCommand(
+      name = tableIdentifier,
+      userSpecifiedColumns = Nil,
+      comment = None,
+      properties = Map.empty,
+      originalText = None,
+      plan = new SparkConnectPlanner(createView.getInput, session).transform(),
+      allowExisting = false,
+      replace = createView.getReplace,
+      viewType = viewType,
+      isAnalyzed = true)
+
+    Dataset.ofRows(session, plan).queryExecution.commandExecuted
+  }
+
   /**
    * Transforms the write operation and executes it.
    *
diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 416b20f2ba9..ec14333fdc3 100644
--- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -182,6 +182,19 @@ package object dsl {
         writeOp.setInput(logicalPlan)
         Command.newBuilder().setWriteOperation(writeOp.build()).build()
       }
+
+      def createView(name: String, global: Boolean, replace: Boolean): Command = {
+        Command
+          .newBuilder()
+          .setCreateDataframeView(
+            CreateDataFrameViewCommand
+              .newBuilder()
+              .setName(name)
+              .setIsGlobal(global)
+              .setReplace(replace)
+              .setInput(logicalPlan))
+          .build()
+      }
     }
   }
 
diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
index e5ca670e4dd..8ab8e0599fc 100644
--- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
+++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
@@ -138,4 +138,23 @@ class SparkConnectCommandPlannerSuite
     }
   }
 
+  test("Test CreateView") {
+    withView("view1", "view2", "view3", "view4") {
+      transform(localRelation.createView("view1", global = true, replace = true))
+      assert(spark.catalog.tableExists("global_temp.view1"))
+
+      transform(localRelation.createView("view2", global = false, replace = true))
+      assert(spark.catalog.tableExists("view2"))
+
+      transform(localRelation.createView("view3", global = true, replace = false))
+      assertThrows[AnalysisException] {
+        transform(localRelation.createView("view3", global = true, replace = false))
+      }
+
+      transform(localRelation.createView("view4", global = false, replace = false))
+      assertThrows[AnalysisException] {
+        transform(localRelation.createView("view4", global = false, replace = false))
+      }
+    }
+  }
 }
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py
index fa05b6ff76c..11f53322ce7 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -33,7 +33,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xb3\x01\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperationB\x0e\n\x0c\x63ommand_type"\x97\x04\n\x14\x43reateScalarFunction\x12\x14\n\x05parts\x18\x01 \x03(\tR\x05parts\x12P\n\x0 [...]
+    b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x94\x02\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateD [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -45,17 +45,19 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _WRITEOPERATION_OPTIONSENTRY._options = None
     _WRITEOPERATION_OPTIONSENTRY._serialized_options = b"8\001"
     _COMMAND._serialized_start = 106
-    _COMMAND._serialized_end = 285
-    _CREATESCALARFUNCTION._serialized_start = 288
-    _CREATESCALARFUNCTION._serialized_end = 823
-    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 661
-    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 800
-    _WRITEOPERATION._serialized_start = 826
-    _WRITEOPERATION._serialized_end = 1568
-    _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1264
-    _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1322
-    _WRITEOPERATION_BUCKETBY._serialized_start = 1324
-    _WRITEOPERATION_BUCKETBY._serialized_end = 1415
-    _WRITEOPERATION_SAVEMODE._serialized_start = 1418
-    _WRITEOPERATION_SAVEMODE._serialized_end = 1555
+    _COMMAND._serialized_end = 382
+    _CREATESCALARFUNCTION._serialized_start = 385
+    _CREATESCALARFUNCTION._serialized_end = 920
+    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 758
+    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 897
+    _CREATEDATAFRAMEVIEWCOMMAND._serialized_start = 923
+    _CREATEDATAFRAMEVIEWCOMMAND._serialized_end = 1073
+    _WRITEOPERATION._serialized_start = 1076
+    _WRITEOPERATION._serialized_end = 1818
+    _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1514
+    _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1572
+    _WRITEOPERATION_BUCKETBY._serialized_start = 1574
+    _WRITEOPERATION_BUCKETBY._serialized_end = 1665
+    _WRITEOPERATION_SAVEMODE._serialized_start = 1668
+    _WRITEOPERATION_SAVEMODE._serialized_end = 1805
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 0ac8da335b5..9b9880e0b93 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -60,21 +60,27 @@ class Command(google.protobuf.message.Message):
 
     CREATE_FUNCTION_FIELD_NUMBER: builtins.int
     WRITE_OPERATION_FIELD_NUMBER: builtins.int
+    CREATE_DATAFRAME_VIEW_FIELD_NUMBER: builtins.int
     @property
     def create_function(self) -> global___CreateScalarFunction: ...
     @property
     def write_operation(self) -> global___WriteOperation: ...
+    @property
+    def create_dataframe_view(self) -> global___CreateDataFrameViewCommand: ...
     def __init__(
         self,
         *,
         create_function: global___CreateScalarFunction | None = ...,
         write_operation: global___WriteOperation | None = ...,
+        create_dataframe_view: global___CreateDataFrameViewCommand | None = ...,
     ) -> None: ...
     def HasField(
         self,
         field_name: typing_extensions.Literal[
             "command_type",
             b"command_type",
+            "create_dataframe_view",
+            b"create_dataframe_view",
             "create_function",
             b"create_function",
             "write_operation",
@@ -86,6 +92,8 @@ class Command(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "command_type",
             b"command_type",
+            "create_dataframe_view",
+            b"create_dataframe_view",
             "create_function",
             b"create_function",
             "write_operation",
@@ -94,7 +102,9 @@ class Command(google.protobuf.message.Message):
     ) -> None: ...
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["command_type", b"command_type"]
-    ) -> typing_extensions.Literal["create_function", "write_operation"] | None: ...
+    ) -> typing_extensions.Literal[
+        "create_function", "write_operation", "create_dataframe_view"
+    ] | None: ...
 
 global___Command = Command
 
@@ -210,6 +220,48 @@ class CreateScalarFunction(google.protobuf.message.Message):
 
 global___CreateScalarFunction = CreateScalarFunction
 
+class CreateDataFrameViewCommand(google.protobuf.message.Message):
+    """A command that can create DataFrame global temp view or local temp view."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    INPUT_FIELD_NUMBER: builtins.int
+    NAME_FIELD_NUMBER: builtins.int
+    IS_GLOBAL_FIELD_NUMBER: builtins.int
+    REPLACE_FIELD_NUMBER: builtins.int
+    @property
+    def input(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+        """Required. The relation that this view will be built on."""
+    name: builtins.str
+    """Required. View name."""
+    is_global: builtins.bool
+    """Required. Whether this is global temp view or local temp view."""
+    replace: builtins.bool
+    """Required.
+
+    If true, and if the view already exists, updates it; if false, and if the view
+    already exists, throws exception.
+    """
+    def __init__(
+        self,
+        *,
+        input: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+        name: builtins.str = ...,
+        is_global: builtins.bool = ...,
+        replace: builtins.bool = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["input", b"input"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "input", b"input", "is_global", b"is_global", "name", b"name", "replace", b"replace"
+        ],
+    ) -> None: ...
+
+global___CreateDataFrameViewCommand = CreateDataFrameViewCommand
+
 class WriteOperation(google.protobuf.message.Message):
     """As writes are not directly handled during analysis and planning, they are modeled as commands."""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org