You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/10 05:02:59 UTC
[spark] branch master updated: [SPARK-41046][CONNECT] Support CreateView in Connect DSL
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f5101319c1a [SPARK-41046][CONNECT] Support CreateView in Connect DSL
f5101319c1a is described below
commit f5101319c1a83a754d899e10a367356af069ca66
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Nov 10 13:02:45 2022 +0800
[SPARK-41046][CONNECT] Support CreateView in Connect DSL
### What changes were proposed in this pull request?
This PR supports creating global temp view or local temp view in Connect DSL.
In proto, it is modeled as a command which will be executed immediately on the server side.
### Why are the changes needed?
Improve API coverage.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes #38566 from amaliujia/create_view_api.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../src/main/protobuf/spark/connect/commands.proto | 19 ++++++++
.../command/SparkConnectCommandPlanner.scala | 32 +++++++++++++
.../org/apache/spark/sql/connect/dsl/package.scala | 13 ++++++
.../planner/SparkConnectCommandPlannerSuite.scala | 19 ++++++++
python/pyspark/sql/connect/proto/commands_pb2.py | 30 ++++++------
python/pyspark/sql/connect/proto/commands_pb2.pyi | 54 +++++++++++++++++++++-
6 files changed, 152 insertions(+), 15 deletions(-)
diff --git a/connector/connect/src/main/protobuf/spark/connect/commands.proto b/connector/connect/src/main/protobuf/spark/connect/commands.proto
index 79c6cffdf60..086d4d0cc92 100644
--- a/connector/connect/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/commands.proto
@@ -31,6 +31,7 @@ message Command {
oneof command_type {
CreateScalarFunction create_function = 1;
WriteOperation write_operation = 2;
+ CreateDataFrameViewCommand create_dataframe_view = 3;
}
}
@@ -65,6 +66,24 @@ message CreateScalarFunction {
}
}
+// A command that can create DataFrame global temp view or local temp view.
+message CreateDataFrameViewCommand {
+ // Required. The relation that this view will be built on.
+ Relation input = 1;
+
+ // Required. View name.
+ string name = 2;
+
+ // Required. Whether this is global temp view or local temp view.
+ bool is_global = 3;
+
+ // Required.
+ //
+ // If true, and if the view already exists, updates it; if false, and if the view
+ // already exists, throws exception.
+ bool replace = 4;
+}
+
// As writes are not directly handled during analysis and planning, they are modeled as commands.
message WriteOperation {
// The output of the `input` relation will be persisted according to the options.
diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
index 80c36a4773e..11090976c7f 100644
--- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
+++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
@@ -25,7 +25,11 @@ import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.WriteOperation
import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView}
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, SparkConnectPlanner}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.command.CreateViewCommand
import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
import org.apache.spark.sql.types.StringType
@@ -45,6 +49,8 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
handleCreateScalarFunction(command.getCreateFunction)
case proto.Command.CommandTypeCase.WRITE_OPERATION =>
handleWriteOperation(command.getWriteOperation)
+ case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW =>
+ handleCreateViewCommand(command.getCreateDataframeView)
case _ => throw new UnsupportedOperationException(s"$command not supported.")
}
}
@@ -79,6 +85,32 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command)
session.udf.registerPython(cf.getPartsList.asScala.head, udf)
}
+ def handleCreateViewCommand(createView: proto.CreateDataFrameViewCommand): Unit = {
+ val viewType = if (createView.getIsGlobal) GlobalTempView else LocalTempView
+
+ val tableIdentifier =
+ try {
+ session.sessionState.sqlParser.parseTableIdentifier(createView.getName)
+ } catch {
+ case _: ParseException =>
+ throw QueryCompilationErrors.invalidViewNameError(createView.getName)
+ }
+
+ val plan = CreateViewCommand(
+ name = tableIdentifier,
+ userSpecifiedColumns = Nil,
+ comment = None,
+ properties = Map.empty,
+ originalText = None,
+ plan = new SparkConnectPlanner(createView.getInput, session).transform(),
+ allowExisting = false,
+ replace = createView.getReplace,
+ viewType = viewType,
+ isAnalyzed = true)
+
+ Dataset.ofRows(session, plan).queryExecution.commandExecuted
+ }
+
/**
* Transforms the write operation and executes it.
*
diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 416b20f2ba9..ec14333fdc3 100644
--- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -182,6 +182,19 @@ package object dsl {
writeOp.setInput(logicalPlan)
Command.newBuilder().setWriteOperation(writeOp.build()).build()
}
+
+ def createView(name: String, global: Boolean, replace: Boolean): Command = {
+ Command
+ .newBuilder()
+ .setCreateDataframeView(
+ CreateDataFrameViewCommand
+ .newBuilder()
+ .setName(name)
+ .setIsGlobal(global)
+ .setReplace(replace)
+ .setInput(logicalPlan))
+ .build()
+ }
}
}
diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
index e5ca670e4dd..8ab8e0599fc 100644
--- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
+++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
@@ -138,4 +138,23 @@ class SparkConnectCommandPlannerSuite
}
}
+ test("Test CreateView") {
+ withView("view1", "view2", "view3", "view4") {
+ transform(localRelation.createView("view1", global = true, replace = true))
+ assert(spark.catalog.tableExists("global_temp.view1"))
+
+ transform(localRelation.createView("view2", global = false, replace = true))
+ assert(spark.catalog.tableExists("view2"))
+
+ transform(localRelation.createView("view3", global = true, replace = false))
+ assertThrows[AnalysisException] {
+ transform(localRelation.createView("view3", global = true, replace = false))
+ }
+
+ transform(localRelation.createView("view4", global = false, replace = false))
+ assertThrows[AnalysisException] {
+ transform(localRelation.createView("view4", global = false, replace = false))
+ }
+ }
+ }
}
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py
index fa05b6ff76c..11f53322ce7 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -33,7 +33,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
- b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xb3\x01\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperationB\x0e\n\x0c\x63ommand_type"\x97\x04\n\x14\x43reateScalarFunction\x12\x14\n\x05parts\x18\x01 \x03(\tR\x05parts\x12P\n\x0 [...]
+ b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x94\x02\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateD [...]
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -45,17 +45,19 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_WRITEOPERATION_OPTIONSENTRY._options = None
_WRITEOPERATION_OPTIONSENTRY._serialized_options = b"8\001"
_COMMAND._serialized_start = 106
- _COMMAND._serialized_end = 285
- _CREATESCALARFUNCTION._serialized_start = 288
- _CREATESCALARFUNCTION._serialized_end = 823
- _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 661
- _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 800
- _WRITEOPERATION._serialized_start = 826
- _WRITEOPERATION._serialized_end = 1568
- _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1264
- _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1322
- _WRITEOPERATION_BUCKETBY._serialized_start = 1324
- _WRITEOPERATION_BUCKETBY._serialized_end = 1415
- _WRITEOPERATION_SAVEMODE._serialized_start = 1418
- _WRITEOPERATION_SAVEMODE._serialized_end = 1555
+ _COMMAND._serialized_end = 382
+ _CREATESCALARFUNCTION._serialized_start = 385
+ _CREATESCALARFUNCTION._serialized_end = 920
+ _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 758
+ _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 897
+ _CREATEDATAFRAMEVIEWCOMMAND._serialized_start = 923
+ _CREATEDATAFRAMEVIEWCOMMAND._serialized_end = 1073
+ _WRITEOPERATION._serialized_start = 1076
+ _WRITEOPERATION._serialized_end = 1818
+ _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1514
+ _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1572
+ _WRITEOPERATION_BUCKETBY._serialized_start = 1574
+ _WRITEOPERATION_BUCKETBY._serialized_end = 1665
+ _WRITEOPERATION_SAVEMODE._serialized_start = 1668
+ _WRITEOPERATION_SAVEMODE._serialized_end = 1805
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 0ac8da335b5..9b9880e0b93 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -60,21 +60,27 @@ class Command(google.protobuf.message.Message):
CREATE_FUNCTION_FIELD_NUMBER: builtins.int
WRITE_OPERATION_FIELD_NUMBER: builtins.int
+ CREATE_DATAFRAME_VIEW_FIELD_NUMBER: builtins.int
@property
def create_function(self) -> global___CreateScalarFunction: ...
@property
def write_operation(self) -> global___WriteOperation: ...
+ @property
+ def create_dataframe_view(self) -> global___CreateDataFrameViewCommand: ...
def __init__(
self,
*,
create_function: global___CreateScalarFunction | None = ...,
write_operation: global___WriteOperation | None = ...,
+ create_dataframe_view: global___CreateDataFrameViewCommand | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"command_type",
b"command_type",
+ "create_dataframe_view",
+ b"create_dataframe_view",
"create_function",
b"create_function",
"write_operation",
@@ -86,6 +92,8 @@ class Command(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"command_type",
b"command_type",
+ "create_dataframe_view",
+ b"create_dataframe_view",
"create_function",
b"create_function",
"write_operation",
@@ -94,7 +102,9 @@ class Command(google.protobuf.message.Message):
) -> None: ...
def WhichOneof(
self, oneof_group: typing_extensions.Literal["command_type", b"command_type"]
- ) -> typing_extensions.Literal["create_function", "write_operation"] | None: ...
+ ) -> typing_extensions.Literal[
+ "create_function", "write_operation", "create_dataframe_view"
+ ] | None: ...
global___Command = Command
@@ -210,6 +220,48 @@ class CreateScalarFunction(google.protobuf.message.Message):
global___CreateScalarFunction = CreateScalarFunction
+class CreateDataFrameViewCommand(google.protobuf.message.Message):
+ """A command that can create DataFrame global temp view or local temp view."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ INPUT_FIELD_NUMBER: builtins.int
+ NAME_FIELD_NUMBER: builtins.int
+ IS_GLOBAL_FIELD_NUMBER: builtins.int
+ REPLACE_FIELD_NUMBER: builtins.int
+ @property
+ def input(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+ """Required. The relation that this view will be built on."""
+ name: builtins.str
+ """Required. View name."""
+ is_global: builtins.bool
+ """Required. Whether this is global temp view or local temp view."""
+ replace: builtins.bool
+ """Required.
+
+ If true, and if the view already exists, updates it; if false, and if the view
+ already exists, throws exception.
+ """
+ def __init__(
+ self,
+ *,
+ input: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+ name: builtins.str = ...,
+ is_global: builtins.bool = ...,
+ replace: builtins.bool = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["input", b"input"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "input", b"input", "is_global", b"is_global", "name", b"name", "replace", b"replace"
+ ],
+ ) -> None: ...
+
+global___CreateDataFrameViewCommand = CreateDataFrameViewCommand
+
class WriteOperation(google.protobuf.message.Message):
"""As writes are not directly handled during analysis and planning, they are modeled as commands."""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org