You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/01 00:25:10 UTC

[spark] branch master updated: [SPARK-42543][CONNECT] Specify protocol for UDF artifact transfer in JVM/Scala client

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d110d8a4e23 [SPARK-42543][CONNECT] Specify protocol for UDF artifact transfer in JVM/Scala client
d110d8a4e23 is described below

commit d110d8a4e23882d53bad87e4992b62b42ff1de23
Author: vicennial <ve...@databricks.com>
AuthorDate: Tue Feb 28 20:24:56 2023 -0400

    [SPARK-42543][CONNECT] Specify protocol for UDF artifact transfer in JVM/Scala client
    
    ### What changes were proposed in this pull request?
    
    This PR introduces a new client-streaming RPC service "AddArtifacts" to handle the transfer of artifacts from the client to the server. New message types `AddArtifactsRequest` and `AddArtifactsResponse` are added that specify the format of artifact transfer.
    
    An artifact is defined by its `name` and `data` fields.
    
    - `name`
      - The name of the artifact is expected in the form of a "Relative Path" that is made up of a sequence of directories and the final file element.
      - Examples of "Relative Path"s: `jars/test.jar`,  `classes/xyz.class`, `abc.xyz`, `a/b/X.jar`.
      - The server is expected to maintain the hierarchy of files as defined by their name. (i.e The relative path of the file on the server's filesystem will be the same as the name of the provided artifact).
    - `data`
      - The raw data of the artifact.
    
    The intention behind the `name` format is to add extensibility to the approach. Through this scheme, the server can maintain the hierarchy/grouping of files in any way the client specifies as well as transfer different "forms" of artifacts without needing any updates to the protocol/code itself.
    
    The protocol supports batching and chunking (due to gRPC size limits) of artifacts as required.
    
    ### Why are the changes needed?
    
    In the decoupled client-server architecture of Spark Connect, a remote client may use a local JAR or a new class in their UDF that may not be present on the server. To handle these cases of missing "artifacts", a protocol for artifact transfer is needed to move the required artifacts from the client side over to the server side.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    N/A
    
    Closes #40147 from vicennial/artifactProtocol.
    
    Authored-by: vicennial <ve...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../src/main/protobuf/spark/connect/base.proto     |  86 +++++++
 python/pyspark/sql/connect/proto/base_pb2.py       | 105 ++++++++-
 python/pyspark/sql/connect/proto/base_pb2.pyi      | 256 +++++++++++++++++++++
 python/pyspark/sql/connect/proto/base_pb2_grpc.py  |  47 ++++
 4 files changed, 491 insertions(+), 3 deletions(-)

diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 70e16bae8b3..3eacd0cc482 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -343,6 +343,88 @@ message ConfigResponse {
   repeated string warnings = 3;
 }
 
+// Request to transfer client-local artifacts.
+message AddArtifactsRequest {
+
+  // The client_id is set by the client to be able to collate streaming responses from
+  // different queries.
+  string client_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // A chunk of an Artifact.
+  message ArtifactChunk {
+    // Data chunk.
+    bytes data = 1;
+    // CRC to allow server to verify integrity of the chunk.
+    int64 crc = 2;
+  }
+
+  // An artifact that is contained in a single `ArtifactChunk`.
+  // Generally, this message represents tiny artifacts such as REPL-generated class files.
+  message SingleChunkArtifact {
+    // The name of the artifact is expected in the form of a "Relative Path" that is made up of a
+    // sequence of directories and the final file element.
+    // Examples of "Relative Path"s: "jars/test.jar", "classes/xyz.class", "abc.xyz", "a/b/X.jar".
+    // The server is expected to maintain the hierarchy of files as defined by their name. (i.e
+    // The relative path of the file on the server's filesystem will be the same as the name of
+    // the provided artifact)
+    string name = 1;
+    // A single data chunk.
+    ArtifactChunk data = 2;
+  }
+
+  // A number of `SingleChunkArtifact` batched into a single RPC.
+  message Batch {
+    repeated SingleChunkArtifact artifacts = 1;
+  }
+
+  // Signals the beginning/start of a chunked artifact.
+  // A large artifact is transferred through a payload of `BeginChunkedArtifact` followed by a
+  // sequence of `ArtifactChunk`s.
+  message BeginChunkedArtifact {
+    // Name of the artifact undergoing chunking. Follows the same conventions as the `name` in
+    // the `Artifact` message.
+    string name = 1;
+    // Total size of the artifact in bytes.
+    int64 total_bytes = 2;
+    // Number of chunks the artifact is split into.
+    // This includes the `initial_chunk`.
+    int64 num_chunks = 3;
+    // The first/initial chunk.
+    ArtifactChunk initial_chunk = 4;
+  }
+
+  // The payload is either a batch of artifacts or a partial chunk of a large artifact.
+  oneof payload {
+    Batch batch = 3;
+    // The metadata and the initial chunk of a large artifact chunked into multiple requests.
+    // The server side is notified about the total size of the large artifact as well as the
+    // number of chunks to expect.
+    BeginChunkedArtifact begin_chunk = 4;
+    // A chunk of an artifact excluding metadata. This can be any chunk of a large artifact
+    // excluding the first chunk (which is included in `BeginChunkedArtifact`).
+    ArtifactChunk chunk = 5;
+  }
+}
+
+// Response to adding an artifact. Contains relevant metadata to verify successful transfer of
+// artifact(s).
+message AddArtifactsResponse {
+  // Metadata of an artifact.
+  message ArtifactSummary {
+    string name = 1;
+    // Whether the CRC (Cyclic Redundancy Check) is successful on server verification.
+    // The server discards any artifact that fails the CRC.
+    // If false, the client may choose to resend the artifact specified by `name`.
+    bool is_crc_successful = 2;
+  }
+
+  // The list of artifact(s) seen by the server.
+  repeated ArtifactSummary artifacts = 1;
+}
+
 // Main interface for the SparkConnect service.
 service SparkConnectService {
 
@@ -356,5 +438,9 @@ service SparkConnectService {
 
   // Update or fetch the configurations and returns a [[ConfigResponse]] containing the result.
   rpc Config(ConfigRequest) returns (ConfigResponse) {}
+
+  // Add artifacts to the session and returns a [[AddArtifactsResponse]] containing metadata about
+  // the added artifacts.
+  rpc AddArtifacts(stream AddArtifactsRequest) returns (AddArtifactsResponse) {}
 }
 
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 576e4fd8ab3..c43619facb6 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x0 [...]
+    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x0 [...]
 )
 
 
@@ -84,6 +84,19 @@ _CONFIGREQUEST_GETALL = _CONFIGREQUEST.nested_types_by_name["GetAll"]
 _CONFIGREQUEST_UNSET = _CONFIGREQUEST.nested_types_by_name["Unset"]
 _CONFIGREQUEST_ISMODIFIABLE = _CONFIGREQUEST.nested_types_by_name["IsModifiable"]
 _CONFIGRESPONSE = DESCRIPTOR.message_types_by_name["ConfigResponse"]
+_ADDARTIFACTSREQUEST = DESCRIPTOR.message_types_by_name["AddArtifactsRequest"]
+_ADDARTIFACTSREQUEST_ARTIFACTCHUNK = _ADDARTIFACTSREQUEST.nested_types_by_name["ArtifactChunk"]
+_ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT = _ADDARTIFACTSREQUEST.nested_types_by_name[
+    "SingleChunkArtifact"
+]
+_ADDARTIFACTSREQUEST_BATCH = _ADDARTIFACTSREQUEST.nested_types_by_name["Batch"]
+_ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT = _ADDARTIFACTSREQUEST.nested_types_by_name[
+    "BeginChunkedArtifact"
+]
+_ADDARTIFACTSRESPONSE = DESCRIPTOR.message_types_by_name["AddArtifactsResponse"]
+_ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY = _ADDARTIFACTSRESPONSE.nested_types_by_name[
+    "ArtifactSummary"
+]
 _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE = _ANALYZEPLANREQUEST_EXPLAIN.enum_types_by_name[
     "ExplainMode"
 ]
@@ -476,6 +489,78 @@ ConfigResponse = _reflection.GeneratedProtocolMessageType(
 )
 _sym_db.RegisterMessage(ConfigResponse)
 
+AddArtifactsRequest = _reflection.GeneratedProtocolMessageType(
+    "AddArtifactsRequest",
+    (_message.Message,),
+    {
+        "ArtifactChunk": _reflection.GeneratedProtocolMessageType(
+            "ArtifactChunk",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ADDARTIFACTSREQUEST_ARTIFACTCHUNK,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AddArtifactsRequest.ArtifactChunk)
+            },
+        ),
+        "SingleChunkArtifact": _reflection.GeneratedProtocolMessageType(
+            "SingleChunkArtifact",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AddArtifactsRequest.SingleChunkArtifact)
+            },
+        ),
+        "Batch": _reflection.GeneratedProtocolMessageType(
+            "Batch",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ADDARTIFACTSREQUEST_BATCH,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AddArtifactsRequest.Batch)
+            },
+        ),
+        "BeginChunkedArtifact": _reflection.GeneratedProtocolMessageType(
+            "BeginChunkedArtifact",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AddArtifactsRequest.BeginChunkedArtifact)
+            },
+        ),
+        "DESCRIPTOR": _ADDARTIFACTSREQUEST,
+        "__module__": "spark.connect.base_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.AddArtifactsRequest)
+    },
+)
+_sym_db.RegisterMessage(AddArtifactsRequest)
+_sym_db.RegisterMessage(AddArtifactsRequest.ArtifactChunk)
+_sym_db.RegisterMessage(AddArtifactsRequest.SingleChunkArtifact)
+_sym_db.RegisterMessage(AddArtifactsRequest.Batch)
+_sym_db.RegisterMessage(AddArtifactsRequest.BeginChunkedArtifact)
+
+AddArtifactsResponse = _reflection.GeneratedProtocolMessageType(
+    "AddArtifactsResponse",
+    (_message.Message,),
+    {
+        "ArtifactSummary": _reflection.GeneratedProtocolMessageType(
+            "ArtifactSummary",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AddArtifactsResponse.ArtifactSummary)
+            },
+        ),
+        "DESCRIPTOR": _ADDARTIFACTSRESPONSE,
+        "__module__": "spark.connect.base_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.AddArtifactsResponse)
+    },
+)
+_sym_db.RegisterMessage(AddArtifactsResponse)
+_sym_db.RegisterMessage(AddArtifactsResponse.ArtifactSummary)
+
 _SPARKCONNECTSERVICE = DESCRIPTOR.services_by_name["SparkConnectService"]
 if _descriptor._USE_C_DESCRIPTORS == False:
 
@@ -561,6 +646,20 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 4992
     _CONFIGRESPONSE._serialized_start = 5010
     _CONFIGRESPONSE._serialized_end = 5130
-    _SPARKCONNECTSERVICE._serialized_start = 5133
-    _SPARKCONNECTSERVICE._serialized_end = 5405
+    _ADDARTIFACTSREQUEST._serialized_start = 5133
+    _ADDARTIFACTSREQUEST._serialized_end = 5948
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 5480
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 5533
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 5535
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 5646
+    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 5648
+    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 5741
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 5744
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 5937
+    _ADDARTIFACTSRESPONSE._serialized_start = 5951
+    _ADDARTIFACTSRESPONSE._serialized_end = 6139
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6058
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6139
+    _SPARKCONNECTSERVICE._serialized_start = 6142
+    _SPARKCONNECTSERVICE._serialized_end = 6507
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index cb5606e1acd..677f101aa47 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -1255,3 +1255,259 @@ class ConfigResponse(google.protobuf.message.Message):
     ) -> None: ...
 
 global___ConfigResponse = ConfigResponse
+
+class AddArtifactsRequest(google.protobuf.message.Message):
+    """Request to transfer client-local artifacts."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    class ArtifactChunk(google.protobuf.message.Message):
+        """A chunk of an Artifact."""
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        DATA_FIELD_NUMBER: builtins.int
+        CRC_FIELD_NUMBER: builtins.int
+        data: builtins.bytes
+        """Data chunk."""
+        crc: builtins.int
+        """CRC to allow server to verify integrity of the chunk."""
+        def __init__(
+            self,
+            *,
+            data: builtins.bytes = ...,
+            crc: builtins.int = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["crc", b"crc", "data", b"data"]
+        ) -> None: ...
+
+    class SingleChunkArtifact(google.protobuf.message.Message):
+        """An artifact that is contained in a single `ArtifactChunk`.
+        Generally, this message represents tiny artifacts such as REPL-generated class files.
+        """
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        NAME_FIELD_NUMBER: builtins.int
+        DATA_FIELD_NUMBER: builtins.int
+        name: builtins.str
+        """The name of the artifact is expected in the form of a "Relative Path" that is made up of a
+        sequence of directories and the final file element.
+        Examples of "Relative Path"s: "jars/test.jar", "classes/xyz.class", "abc.xyz", "a/b/X.jar".
+        The server is expected to maintain the hierarchy of files as defined by their name. (i.e
+        The relative path of the file on the server's filesystem will be the same as the name of
+        the provided artifact)
+        """
+        @property
+        def data(self) -> global___AddArtifactsRequest.ArtifactChunk:
+            """A single data chunk."""
+        def __init__(
+            self,
+            *,
+            name: builtins.str = ...,
+            data: global___AddArtifactsRequest.ArtifactChunk | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["data", b"data"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["data", b"data", "name", b"name"]
+        ) -> None: ...
+
+    class Batch(google.protobuf.message.Message):
+        """A number of `SingleChunkArtifact` batched into a single RPC."""
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        ARTIFACTS_FIELD_NUMBER: builtins.int
+        @property
+        def artifacts(
+            self,
+        ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+            global___AddArtifactsRequest.SingleChunkArtifact
+        ]: ...
+        def __init__(
+            self,
+            *,
+            artifacts: collections.abc.Iterable[global___AddArtifactsRequest.SingleChunkArtifact]
+            | None = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["artifacts", b"artifacts"]
+        ) -> None: ...
+
+    class BeginChunkedArtifact(google.protobuf.message.Message):
+        """Signals the beginning/start of a chunked artifact.
+        A large artifact is transferred through a payload of `BeginChunkedArtifact` followed by a
+        sequence of `ArtifactChunk`s.
+        """
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        NAME_FIELD_NUMBER: builtins.int
+        TOTAL_BYTES_FIELD_NUMBER: builtins.int
+        NUM_CHUNKS_FIELD_NUMBER: builtins.int
+        INITIAL_CHUNK_FIELD_NUMBER: builtins.int
+        name: builtins.str
+        """Name of the artifact undergoing chunking. Follows the same conventions as the `name` in
+        the `Artifact` message.
+        """
+        total_bytes: builtins.int
+        """Total size of the artifact in bytes."""
+        num_chunks: builtins.int
+        """Number of chunks the artifact is split into.
+        This includes the `initial_chunk`.
+        """
+        @property
+        def initial_chunk(self) -> global___AddArtifactsRequest.ArtifactChunk:
+            """The first/initial chunk."""
+        def __init__(
+            self,
+            *,
+            name: builtins.str = ...,
+            total_bytes: builtins.int = ...,
+            num_chunks: builtins.int = ...,
+            initial_chunk: global___AddArtifactsRequest.ArtifactChunk | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["initial_chunk", b"initial_chunk"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self,
+            field_name: typing_extensions.Literal[
+                "initial_chunk",
+                b"initial_chunk",
+                "name",
+                b"name",
+                "num_chunks",
+                b"num_chunks",
+                "total_bytes",
+                b"total_bytes",
+            ],
+        ) -> None: ...
+
+    CLIENT_ID_FIELD_NUMBER: builtins.int
+    USER_CONTEXT_FIELD_NUMBER: builtins.int
+    BATCH_FIELD_NUMBER: builtins.int
+    BEGIN_CHUNK_FIELD_NUMBER: builtins.int
+    CHUNK_FIELD_NUMBER: builtins.int
+    client_id: builtins.str
+    """The client_id is set by the client to be able to collate streaming responses from
+    different queries.
+    """
+    @property
+    def user_context(self) -> global___UserContext:
+        """User context"""
+    @property
+    def batch(self) -> global___AddArtifactsRequest.Batch: ...
+    @property
+    def begin_chunk(self) -> global___AddArtifactsRequest.BeginChunkedArtifact:
+        """The metadata and the initial chunk of a large artifact chunked into multiple requests.
+        The server side is notified about the total size of the large artifact as well as the
+        number of chunks to expect.
+        """
+    @property
+    def chunk(self) -> global___AddArtifactsRequest.ArtifactChunk:
+        """A chunk of an artifact excluding metadata. This can be any chunk of a large artifact
+        excluding the first chunk (which is included in `BeginChunkedArtifact`).
+        """
+    def __init__(
+        self,
+        *,
+        client_id: builtins.str = ...,
+        user_context: global___UserContext | None = ...,
+        batch: global___AddArtifactsRequest.Batch | None = ...,
+        begin_chunk: global___AddArtifactsRequest.BeginChunkedArtifact | None = ...,
+        chunk: global___AddArtifactsRequest.ArtifactChunk | None = ...,
+    ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "batch",
+            b"batch",
+            "begin_chunk",
+            b"begin_chunk",
+            "chunk",
+            b"chunk",
+            "payload",
+            b"payload",
+            "user_context",
+            b"user_context",
+        ],
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "batch",
+            b"batch",
+            "begin_chunk",
+            b"begin_chunk",
+            "chunk",
+            b"chunk",
+            "client_id",
+            b"client_id",
+            "payload",
+            b"payload",
+            "user_context",
+            b"user_context",
+        ],
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["payload", b"payload"]
+    ) -> typing_extensions.Literal["batch", "begin_chunk", "chunk"] | None: ...
+
+global___AddArtifactsRequest = AddArtifactsRequest
+
+class AddArtifactsResponse(google.protobuf.message.Message):
+    """Response to adding an artifact. Contains relevant metadata to verify successful transfer of
+    artifact(s).
+    """
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    class ArtifactSummary(google.protobuf.message.Message):
+        """Metadata of an artifact."""
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        NAME_FIELD_NUMBER: builtins.int
+        IS_CRC_SUCCESSFUL_FIELD_NUMBER: builtins.int
+        name: builtins.str
+        is_crc_successful: builtins.bool
+        """Whether the CRC (Cyclic Redundancy Check) is successful on server verification.
+        The server discards any artifact that fails the CRC.
+        If false, the client may choose to resend the artifact specified by `name`.
+        """
+        def __init__(
+            self,
+            *,
+            name: builtins.str = ...,
+            is_crc_successful: builtins.bool = ...,
+        ) -> None: ...
+        def ClearField(
+            self,
+            field_name: typing_extensions.Literal[
+                "is_crc_successful", b"is_crc_successful", "name", b"name"
+            ],
+        ) -> None: ...
+
+    ARTIFACTS_FIELD_NUMBER: builtins.int
+    @property
+    def artifacts(
+        self,
+    ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+        global___AddArtifactsResponse.ArtifactSummary
+    ]:
+        """The list of artifact(s) seen by the server."""
+    def __init__(
+        self,
+        *,
+        artifacts: collections.abc.Iterable[global___AddArtifactsResponse.ArtifactSummary]
+        | None = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["artifacts", b"artifacts"]
+    ) -> None: ...
+
+global___AddArtifactsResponse = AddArtifactsResponse
diff --git a/python/pyspark/sql/connect/proto/base_pb2_grpc.py b/python/pyspark/sql/connect/proto/base_pb2_grpc.py
index 007e31fd0ea..c372cbcc487 100644
--- a/python/pyspark/sql/connect/proto/base_pb2_grpc.py
+++ b/python/pyspark/sql/connect/proto/base_pb2_grpc.py
@@ -45,6 +45,11 @@ class SparkConnectServiceStub(object):
             request_serializer=spark_dot_connect_dot_base__pb2.ConfigRequest.SerializeToString,
             response_deserializer=spark_dot_connect_dot_base__pb2.ConfigResponse.FromString,
         )
+        self.AddArtifacts = channel.stream_unary(
+            "/spark.connect.SparkConnectService/AddArtifacts",
+            request_serializer=spark_dot_connect_dot_base__pb2.AddArtifactsRequest.SerializeToString,
+            response_deserializer=spark_dot_connect_dot_base__pb2.AddArtifactsResponse.FromString,
+        )
 
 
 class SparkConnectServiceServicer(object):
@@ -71,6 +76,14 @@ class SparkConnectServiceServicer(object):
         context.set_details("Method not implemented!")
         raise NotImplementedError("Method not implemented!")
 
+    def AddArtifacts(self, request_iterator, context):
+        """Add artifacts to the session and returns a [[AddArtifactsResponse]] containing metadata about
+        the added artifacts.
+        """
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details("Method not implemented!")
+        raise NotImplementedError("Method not implemented!")
+
 
 def add_SparkConnectServiceServicer_to_server(servicer, server):
     rpc_method_handlers = {
@@ -89,6 +102,11 @@ def add_SparkConnectServiceServicer_to_server(servicer, server):
             request_deserializer=spark_dot_connect_dot_base__pb2.ConfigRequest.FromString,
             response_serializer=spark_dot_connect_dot_base__pb2.ConfigResponse.SerializeToString,
         ),
+        "AddArtifacts": grpc.stream_unary_rpc_method_handler(
+            servicer.AddArtifacts,
+            request_deserializer=spark_dot_connect_dot_base__pb2.AddArtifactsRequest.FromString,
+            response_serializer=spark_dot_connect_dot_base__pb2.AddArtifactsResponse.SerializeToString,
+        ),
     }
     generic_handler = grpc.method_handlers_generic_handler(
         "spark.connect.SparkConnectService", rpc_method_handlers
@@ -186,3 +204,32 @@ class SparkConnectService(object):
             timeout,
             metadata,
         )
+
+    @staticmethod
+    def AddArtifacts(
+        request_iterator,
+        target,
+        options=(),
+        channel_credentials=None,
+        call_credentials=None,
+        insecure=False,
+        compression=None,
+        wait_for_ready=None,
+        timeout=None,
+        metadata=None,
+    ):
+        return grpc.experimental.stream_unary(
+            request_iterator,
+            target,
+            "/spark.connect.SparkConnectService/AddArtifacts",
+            spark_dot_connect_dot_base__pb2.AddArtifactsRequest.SerializeToString,
+            spark_dot_connect_dot_base__pb2.AddArtifactsResponse.FromString,
+            options,
+            channel_credentials,
+            insecure,
+            call_credentials,
+            compression,
+            wait_for_ready,
+            timeout,
+            metadata,
+        )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org