You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/28 16:40:15 UTC

[spark] branch master updated: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c19ea4773b [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`
0c19ea4773b is described below

commit 0c19ea4773b8eaeedc4031cb1c5c3e97f2c2d3b9
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Tue Feb 28 12:39:34 2023 -0400

    [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`
    
    ### What changes were proposed in this pull request?
    Refactor the AnalyzePlan RPC and add `session.version`
    
    ### Why are the changes needed?
    Existing implementation always return schema, explain string, input files, etc together, but in most cases we only need the  schema, so we should separate them to avoid unnecessary analysis, optimization and IO (required in `input files`).
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new session API
    
    ```
    >>> spark.version
    '3.5.0-SNAPSHOT'
    >>>
    ```
    
    ### How was this patch tested?
    updated tests and added tests
    
    Closes #40210 from zhengruifeng/connect_refactor_analyze.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  52 +-
 .../scala/org/apache/spark/sql/SparkSession.scala  |   7 +-
 .../sql/connect/client/SparkConnectClient.scala    |  49 +-
 .../connect/client/SparkConnectClientSuite.scala   |  16 +-
 .../src/main/protobuf/spark/connect/base.proto     | 161 ++++--
 .../sql/connect/planner/SparkConnectPlanner.scala  |   2 +-
 .../service/SparkConnectAnalyzeHandler.scala       | 148 ++++++
 .../sql/connect/service/SparkConnectService.scala  |  62 +--
 .../connect/planner/SparkConnectServiceSuite.scala | 110 ++++-
 python/pyspark/sql/connect/client.py               | 144 ++++--
 python/pyspark/sql/connect/dataframe.py            |  16 +-
 python/pyspark/sql/connect/proto/base_pb2.py       | 320 +++++++++---
 python/pyspark/sql/connect/proto/base_pb2.pyi      | 547 +++++++++++++++++----
 python/pyspark/sql/connect/session.py              |   4 +-
 .../sql/tests/connect/test_connect_basic.py        |   7 +-
 15 files changed, 1282 insertions(+), 363 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index c4f54e493ee..6ef20595630 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -234,7 +234,13 @@ class Dataset[T] private[sql] (
    */
   def schema: StructType = {
     if (encoder == UnboundRowEncoder) {
-      DataTypeProtoConverter.toCatalystType(analyze.getSchema).asInstanceOf[StructType]
+      DataTypeProtoConverter
+        .toCatalystType(
+          sparkSession
+            .analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA)
+            .getSchema
+            .getSchema)
+        .asInstanceOf[StructType]
     } else {
       encoder.schema
     }
@@ -272,11 +278,11 @@ class Dataset[T] private[sql] (
    */
   def explain(mode: String): Unit = {
     val protoMode = mode.trim.toLowerCase(Locale.ROOT) match {
-      case "simple" => proto.Explain.ExplainMode.SIMPLE
-      case "extended" => proto.Explain.ExplainMode.EXTENDED
-      case "codegen" => proto.Explain.ExplainMode.CODEGEN
-      case "cost" => proto.Explain.ExplainMode.COST
-      case "formatted" => proto.Explain.ExplainMode.FORMATTED
+      case "simple" => proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_SIMPLE
+      case "extended" => proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_EXTENDED
+      case "codegen" => proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_CODEGEN
+      case "cost" => proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_COST
+      case "formatted" => proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_FORMATTED
       case _ => throw new IllegalArgumentException("Unsupported explain mode: " + mode)
     }
     explain(protoMode)
@@ -293,9 +299,9 @@ class Dataset[T] private[sql] (
    */
   def explain(extended: Boolean): Unit = {
     val mode = if (extended) {
-      proto.Explain.ExplainMode.EXTENDED
+      proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_EXTENDED
     } else {
-      proto.Explain.ExplainMode.SIMPLE
+      proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_SIMPLE
     }
     explain(mode)
   }
@@ -306,11 +312,15 @@ class Dataset[T] private[sql] (
    * @group basic
    * @since 3.4.0
    */
-  def explain(): Unit = explain(proto.Explain.ExplainMode.SIMPLE)
+  def explain(): Unit = explain(proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_SIMPLE)
 
-  private def explain(mode: proto.Explain.ExplainMode): Unit = {
+  private def explain(mode: proto.AnalyzePlanRequest.Explain.ExplainMode): Unit = {
     // scalastyle:off println
-    println(sparkSession.analyze(plan, mode).getExplainString)
+    println(
+      sparkSession
+        .analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN, Some(mode))
+        .getExplain
+        .getExplainString)
     // scalastyle:on println
   }
 
@@ -339,7 +349,10 @@ class Dataset[T] private[sql] (
    * @group basic
    * @since 3.4.0
    */
-  def isLocal: Boolean = analyze.getIsLocal
+  def isLocal: Boolean = sparkSession
+    .analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL)
+    .getIsLocal
+    .getIsLocal
 
   /**
    * Returns true if the `Dataset` is empty.
@@ -359,7 +372,10 @@ class Dataset[T] private[sql] (
    * @group streaming
    * @since 3.4.0
    */
-  def isStreaming: Boolean = analyze.getIsStreaming
+  def isStreaming: Boolean = sparkSession
+    .analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING)
+    .getIsStreaming
+    .getIsStreaming
 
   /**
    * Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated,
@@ -2616,7 +2632,13 @@ class Dataset[T] private[sql] (
    * @group basic
    * @since 3.4.0
    */
-  def inputFiles: Array[String] = analyze.getInputFilesList.asScala.toArray
+  def inputFiles: Array[String] =
+    sparkSession
+      .analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES)
+      .getInputFiles
+      .getFilesList
+      .asScala
+      .toArray
 
   /**
    * Interface for saving the content of the non-streaming Dataset out into external storage.
@@ -2714,7 +2736,7 @@ class Dataset[T] private[sql] (
   }
 
   private[sql] def analyze: proto.AnalyzePlanResponse = {
-    sparkSession.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
+    sparkSession.analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA)
   }
 
   def collectResult(): SparkResult[T] = sparkSession.execute(plan, encoder)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index fa13af00f14..4fe6d337d8b 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -254,8 +254,11 @@ class SparkSession(
 
   private[sql] def analyze(
       plan: proto.Plan,
-      mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse =
-    client.analyze(plan, mode)
+      method: proto.AnalyzePlanRequest.AnalyzeCase,
+      explainMode: Option[proto.AnalyzePlanRequest.Explain.ExplainMode] = None)
+      : proto.AnalyzePlanResponse = {
+    client.analyze(plan, method, explainMode)
+  }
 
   private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): SparkResult[T] = {
     val value = client.execute(plan)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 8b69f75b201..3f9416f216a 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -89,11 +89,50 @@ private[sql] class SparkConnectClient(
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
-  def analyze(plan: proto.Plan, mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse = {
-    val request = proto.AnalyzePlanRequest
-      .newBuilder()
-      .setPlan(plan)
-      .setExplain(proto.Explain.newBuilder().setExplainMode(mode))
+  def analyze(
+      plan: proto.Plan,
+      method: proto.AnalyzePlanRequest.AnalyzeCase,
+      explainMode: Option[proto.AnalyzePlanRequest.Explain.ExplainMode] = None)
+      : proto.AnalyzePlanResponse = {
+    val builder = proto.AnalyzePlanRequest.newBuilder()
+    method match {
+      case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
+        builder.setSchema(
+          proto.AnalyzePlanRequest.Schema
+            .newBuilder()
+            .setPlan(plan)
+            .build())
+      case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
+        if (explainMode.isEmpty) {
+          throw new IllegalArgumentException(s"ExplainMode is required in Explain request")
+        }
+        builder.setExplain(
+          proto.AnalyzePlanRequest.Explain
+            .newBuilder()
+            .setPlan(plan)
+            .setExplainMode(explainMode.get)
+            .build())
+      case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
+        builder.setIsLocal(
+          proto.AnalyzePlanRequest.IsLocal
+            .newBuilder()
+            .setPlan(plan)
+            .build())
+      case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
+        builder.setIsStreaming(
+          proto.AnalyzePlanRequest.IsStreaming
+            .newBuilder()
+            .setPlan(plan)
+            .build())
+      case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
+        builder.setInputFiles(
+          proto.AnalyzePlanRequest.InputFiles
+            .newBuilder()
+            .setPlan(plan)
+            .build())
+      case other => throw new IllegalArgumentException(s"Unknown Analyze request $other")
+    }
+    val request = builder
       .setUserContext(userContext)
       .setClientId(sessionId)
       .setClientType(userAgent)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 40afae83009..8cead49de0c 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -207,7 +207,21 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
       responseObserver: StreamObserver[AnalyzePlanResponse]): Unit = {
     // Reply with a dummy response using the same client ID
     val requestClientId = request.getClientId
-    inputPlan = request.getPlan
+    request.getAnalyzeCase match {
+      case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
+        inputPlan = request.getSchema.getPlan
+      case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
+        inputPlan = request.getExplain.getPlan
+      case proto.AnalyzePlanRequest.AnalyzeCase.TREE_STRING =>
+        inputPlan = request.getTreeString.getPlan
+      case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
+        inputPlan = request.getIsLocal.getPlan
+      case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
+        inputPlan = request.getIsStreaming.getPlan
+      case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
+        inputPlan = request.getInputFiles.getPlan
+      case _ => inputPlan = null
+    }
     val response = AnalyzePlanResponse
       .newBuilder()
       .setClientId(requestClientId)
diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 1ffbb8aa881..70e16bae8b3 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -38,35 +38,7 @@ message Plan {
   }
 }
 
-// Explains the input plan based on a configurable mode.
-message Explain {
-  // Plan explanation mode.
-  enum ExplainMode {
-    MODE_UNSPECIFIED = 0;
-
-    // Generates only physical plan.
-    SIMPLE = 1;
-
-    // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
-    // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
-    // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
-    // The optimized logical plan transforms through a set of optimization rules, resulting in the
-    // physical plan.
-    EXTENDED = 2;
-
-    // Generates code for the statement, if any and a physical plan.
-    CODEGEN = 3;
-
-    // If plan node statistics are available, generates a logical plan and also the statistics.
-    COST = 4;
-
-    // Generates a physical plan outline and also node details.
-    FORMATTED = 5;
-  }
 
-  // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
-  ExplainMode explain_mode= 1;
-}
 
 // User Context is used to refer to one particular user session that is executing
 // queries in the backend.
@@ -92,39 +64,136 @@ message AnalyzePlanRequest {
   // (Required) User context
   UserContext user_context = 2;
 
-  // (Required) The logical plan to be analyzed.
-  Plan plan = 3;
-
   // Provides optional information about the client sending the request. This field
   // can be used for language or version specific information and is only intended for
   // logging purposes and will not be interpreted by the server.
-  optional string client_type = 4;
+  optional string client_type = 3;
+
+  oneof analyze {
+    Schema schema = 4;
+    Explain explain = 5;
+    TreeString tree_string = 6;
+    IsLocal is_local = 7;
+    IsStreaming is_streaming = 8;
+    InputFiles input_files = 9;
+    SparkVersion spark_version = 10;
+    DDLParse ddl_parse = 11;
+  }
+
+  message Schema {
+    // (Required) The logical plan to be analyzed.
+    Plan plan = 1;
+  }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // (Required) The logical plan to be analyzed.
+    Plan plan = 1;
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode = 2;
+
+    // Plan explanation mode.
+    enum ExplainMode {
+      EXPLAIN_MODE_UNSPECIFIED = 0;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+      // Generates only physical plan.
+      EXPLAIN_MODE_SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXPLAIN_MODE_EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      EXPLAIN_MODE_CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      EXPLAIN_MODE_COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      EXPLAIN_MODE_FORMATTED = 5;
+    }
+  }
+
+  message TreeString {
+    // (Required) The logical plan to be analyzed.
+    Plan plan = 1;
+  }
+
+  message IsLocal {
+    // (Required) The logical plan to be analyzed.
+    Plan plan = 1;
+  }
+
+  message IsStreaming {
+    // (Required) The logical plan to be analyzed.
+    Plan plan = 1;
+  }
+
+  message InputFiles {
+    // (Required) The logical plan to be analyzed.
+    Plan plan = 1;
+  }
+
+  message SparkVersion { }
+
+  message DDLParse {
+    // (Required) The DDL formatted string to be parsed.
+    string ddl_string = 1;
+  }
 }
 
 // Response to performing analysis of the query. Contains relevant metadata to be able to
 // reason about the performance.
 message AnalyzePlanResponse {
   string client_id = 1;
-  DataType schema = 2;
 
-  // The extended explain string as produced by Spark.
-  string explain_string = 3;
+  oneof result {
+    Schema schema = 2;
+    Explain explain = 3;
+    TreeString tree_string = 4;
+    IsLocal is_local = 5;
+    IsStreaming is_streaming = 6;
+    InputFiles input_files = 7;
+    SparkVersion spark_version = 8;
+    DDLParse ddl_parse = 9;
+  }
+
+  message Schema {
+    DataType schema = 1;
+  }
+
+  message Explain {
+    string explain_string = 1;
+  }
+
+  message TreeString {
+    string tree_string = 1;
+  }
+
+  message IsLocal {
+    bool is_local = 1;
+  }
 
-  // Get the tree string of the schema.
-  string tree_string = 4;
+  message IsStreaming {
+    bool is_streaming = 1;
+  }
 
-  // Whether the 'collect' and 'take' methods can be run locally.
-  bool is_local = 5;
+  message InputFiles {
+    // A best-effort snapshot of the files that compose this Dataset
+    repeated string files = 1;
+  }
 
-  // Whether this plan contains one or more sources that continuously
-  // return data as it arrives.
-  bool is_streaming = 6;
+  message SparkVersion {
+    string version = 1;
+  }
 
-  // A best-effort snapshot of the files that compose this Dataset
-  repeated string input_files = 7;
+  message DDLParse {
+    DataType parsed = 1;
+  }
 }
 
 // A request to be executed by the service.
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 1925a41b916..d52117b469c 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -597,7 +597,7 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
-  private def parseDatatypeString(sqlText: String): DataType = {
+  private[connect] def parseDatatypeString(sqlText: String): DataType = {
     val parser = session.sessionState.sqlParser
     try {
       parser.parseTableSchema(sqlText)
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
new file mode 100644
index 00000000000..90981f5b7ba
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode}
+
+private[connect] class SparkConnectAnalyzeHandler(
+    responseObserver: StreamObserver[proto.AnalyzePlanResponse])
+    extends Logging {
+
+  def handle(request: proto.AnalyzePlanRequest): Unit = {
+    val session =
+      SparkConnectService
+        .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId)
+        .session
+
+    val response = process(request, session)
+    responseObserver.onNext(response)
+    responseObserver.onCompleted()
+  }
+
+  def process(
+      request: proto.AnalyzePlanRequest,
+      session: SparkSession): proto.AnalyzePlanResponse = {
+    lazy val planner = new SparkConnectPlanner(session)
+    val builder = proto.AnalyzePlanResponse.newBuilder()
+
+    request.getAnalyzeCase match {
+      case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
+        val schema = Dataset
+          .ofRows(session, planner.transformRelation(request.getSchema.getPlan.getRoot))
+          .schema
+        builder.setSchema(
+          proto.AnalyzePlanResponse.Schema
+            .newBuilder()
+            .setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
+            .build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
+        val queryExecution = Dataset
+          .ofRows(session, planner.transformRelation(request.getExplain.getPlan.getRoot))
+          .queryExecution
+        val explainString = request.getExplain.getExplainMode match {
+          case proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_SIMPLE =>
+            queryExecution.explainString(SimpleMode)
+          case proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_EXTENDED =>
+            queryExecution.explainString(ExtendedMode)
+          case proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_CODEGEN =>
+            queryExecution.explainString(CodegenMode)
+          case proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_COST =>
+            queryExecution.explainString(CostMode)
+          case proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_FORMATTED =>
+            queryExecution.explainString(FormattedMode)
+          case other => throw new UnsupportedOperationException(s"Unknown Explain Mode $other!")
+        }
+        builder.setExplain(
+          proto.AnalyzePlanResponse.Explain
+            .newBuilder()
+            .setExplainString(explainString)
+            .build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.TREE_STRING =>
+        val treeString = Dataset
+          .ofRows(session, planner.transformRelation(request.getTreeString.getPlan.getRoot))
+          .schema
+          .treeString
+        builder.setTreeString(
+          proto.AnalyzePlanResponse.TreeString
+            .newBuilder()
+            .setTreeString(treeString)
+            .build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
+        val isLocal = Dataset
+          .ofRows(session, planner.transformRelation(request.getIsLocal.getPlan.getRoot))
+          .isLocal
+        builder.setIsLocal(
+          proto.AnalyzePlanResponse.IsLocal
+            .newBuilder()
+            .setIsLocal(isLocal)
+            .build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
+        val isStreaming = Dataset
+          .ofRows(session, planner.transformRelation(request.getIsStreaming.getPlan.getRoot))
+          .isStreaming
+        builder.setIsStreaming(
+          proto.AnalyzePlanResponse.IsStreaming
+            .newBuilder()
+            .setIsStreaming(isStreaming)
+            .build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
+        val inputFiles = Dataset
+          .ofRows(session, planner.transformRelation(request.getInputFiles.getPlan.getRoot))
+          .inputFiles
+        builder.setInputFiles(
+          proto.AnalyzePlanResponse.InputFiles
+            .newBuilder()
+            .addAllFiles(inputFiles.toSeq.asJava)
+            .build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.SPARK_VERSION =>
+        builder.setSparkVersion(
+          proto.AnalyzePlanResponse.SparkVersion
+            .newBuilder()
+            .setVersion(session.version)
+            .build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.DDL_PARSE =>
+        val schema = planner.parseDatatypeString(request.getDdlParse.getDdlString)
+        builder.setDdlParse(
+          proto.AnalyzePlanResponse.DDLParse
+            .newBuilder()
+            .setParsed(DataTypeProtoConverter.toConnectProtoType(schema))
+            .build())
+
+      case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
+    }
+
+    builder.setClientId(request.getClientId)
+    builder.build()
+  }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 227067e2faf..d6446eae4b7 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.connect.service
 import java.util.concurrent.TimeUnit
 
 import scala.annotation.tailrec
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
@@ -40,13 +39,9 @@ import org.json4s.jackson.JsonMethods.{compact, render}
 import org.apache.spark.{SparkEnv, SparkException, SparkThrowable}
 import org.apache.spark.api.python.PythonException
 import org.apache.spark.connect.proto
-import org.apache.spark.connect.proto.{AnalyzePlanRequest, AnalyzePlanResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_BINDING_PORT
-import org.apache.spark.sql.connect.planner.SparkConnectPlanner
-import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExplainMode, ExtendedMode, FormattedMode, SimpleMode}
 
 /**
  * The SparkConnectService implementation.
@@ -57,7 +52,7 @@ import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExplainMode, Exten
  *   delegates debug behavior to the handlers.
  */
 class SparkConnectService(debug: Boolean)
-    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    extends proto.SparkConnectServiceGrpc.SparkConnectServiceImplBase
     with Logging {
 
   private def allClasses(cl: Class[_]): Seq[Class[_]] = {
@@ -143,8 +138,8 @@ class SparkConnectService(debug: Boolean)
    * @param responseObserver
    */
   override def executePlan(
-      request: ExecutePlanRequest,
-      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
     try {
       new SparkConnectStreamHandler(responseObserver).handle(request)
     } catch handleError("execute", observer = responseObserver)
@@ -163,56 +158,13 @@ class SparkConnectService(debug: Boolean)
    * @param responseObserver
    */
   override def analyzePlan(
-      request: AnalyzePlanRequest,
-      responseObserver: StreamObserver[AnalyzePlanResponse]): Unit = {
+      request: proto.AnalyzePlanRequest,
+      responseObserver: StreamObserver[proto.AnalyzePlanResponse]): Unit = {
     try {
-      if (request.getPlan.getOpTypeCase != proto.Plan.OpTypeCase.ROOT) {
-        responseObserver.onError(
-          new UnsupportedOperationException(
-            s"${request.getPlan.getOpTypeCase} not supported for analysis."))
-      }
-      val session =
-        SparkConnectService
-          .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId)
-          .session
-
-      val explainMode = request.getExplain.getExplainMode match {
-        case proto.Explain.ExplainMode.SIMPLE => SimpleMode
-        case proto.Explain.ExplainMode.EXTENDED => ExtendedMode
-        case proto.Explain.ExplainMode.CODEGEN => CodegenMode
-        case proto.Explain.ExplainMode.COST => CostMode
-        case proto.Explain.ExplainMode.FORMATTED => FormattedMode
-        case _ =>
-          throw new IllegalArgumentException(
-            s"Explain mode unspecified. Accepted " +
-              "explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'.")
-      }
-
-      val response = handleAnalyzePlanRequest(request.getPlan.getRoot, session, explainMode)
-      response.setClientId(request.getClientId)
-      responseObserver.onNext(response.build())
-      responseObserver.onCompleted()
+      new SparkConnectAnalyzeHandler(responseObserver).handle(request)
     } catch handleError("analyze", observer = responseObserver)
   }
 
-  def handleAnalyzePlanRequest(
-      relation: proto.Relation,
-      session: SparkSession,
-      explainMode: ExplainMode): proto.AnalyzePlanResponse.Builder = {
-    val logicalPlan = new SparkConnectPlanner(session).transformRelation(relation)
-
-    val ds = Dataset.ofRows(session, logicalPlan)
-    val explainString = ds.queryExecution.explainString(explainMode)
-
-    val response = proto.AnalyzePlanResponse.newBuilder()
-    response.setSchema(DataTypeProtoConverter.toConnectProtoType(ds.schema))
-    response.setExplainString(explainString)
-    response.setTreeString(ds.schema.treeString)
-    response.setIsLocal(ds.isLocal)
-    response.setIsStreaming(ds.isStreaming)
-    response.addAllInputFiles(ds.inputFiles.toSeq.asJava)
-  }
-
   /**
    * This is the main entry method for Spark Connect and all calls to update or fetch
    * configuration..
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index 9c5df253aea..4d9bea88f5c 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -27,8 +27,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.connect.dsl.MockRemoteSession
 import org.apache.spark.sql.connect.dsl.plans._
-import org.apache.spark.sql.connect.service.SparkConnectService
-import org.apache.spark.sql.execution.ExplainMode
+import org.apache.spark.sql.connect.service.{SparkConnectAnalyzeHandler, SparkConnectService}
 import org.apache.spark.sql.test.SharedSparkSession
 
 /**
@@ -43,21 +42,30 @@ class SparkConnectServiceSuite extends SharedSparkSession {
           | USING parquet
           |""".stripMargin)
 
-      val instance = new SparkConnectService(false)
-      val relation = proto.Relation
+      val plan = proto.Plan
         .newBuilder()
-        .setRead(
-          proto.Read
+        .setRoot(
+          proto.Relation
             .newBuilder()
-            .setNamedTable(proto.Read.NamedTable.newBuilder.setUnparsedIdentifier("test").build())
+            .setRead(
+              proto.Read
+                .newBuilder()
+                .setNamedTable(
+                  proto.Read.NamedTable.newBuilder.setUnparsedIdentifier("test").build())
+                .build())
             .build())
         .build()
 
-      val response =
-        instance.handleAnalyzePlanRequest(relation, spark, ExplainMode.fromString("simple"))
+      val handler = new SparkConnectAnalyzeHandler(null)
 
-      assert(response.getSchema.hasStruct)
-      val schema = response.getSchema.getStruct
+      val request1 = proto.AnalyzePlanRequest
+        .newBuilder()
+        .setSchema(proto.AnalyzePlanRequest.Schema.newBuilder().setPlan(plan).build())
+        .build()
+      val response1 = handler.process(request1, spark)
+      assert(response1.hasSchema)
+      assert(response1.getSchema.getSchema.hasStruct)
+      val schema = response1.getSchema.getSchema.getStruct
       assert(schema.getFieldsCount == 2)
       assert(
         schema.getFields(0).getName == "col1"
@@ -66,14 +74,53 @@ class SparkConnectServiceSuite extends SharedSparkSession {
         schema.getFields(1).getName == "col2"
           && schema.getFields(1).getDataType.getKindCase == proto.DataType.KindCase.STRING)
 
-      assert(!response.getIsLocal)
-      assert(!response.getIsLocal)
+      val request2 = proto.AnalyzePlanRequest
+        .newBuilder()
+        .setExplain(
+          proto.AnalyzePlanRequest.Explain
+            .newBuilder()
+            .setPlan(plan)
+            .setExplainMode(proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_SIMPLE)
+            .build())
+        .build()
+      val response2 = handler.process(request2, spark)
+      assert(response2.hasExplain)
+      assert(response2.getExplain.getExplainString.size > 0)
+
+      val request3 = proto.AnalyzePlanRequest
+        .newBuilder()
+        .setIsLocal(proto.AnalyzePlanRequest.IsLocal.newBuilder().setPlan(plan).build())
+        .build()
+      val response3 = handler.process(request3, spark)
+      assert(response3.hasIsLocal)
+      assert(!response3.getIsLocal.getIsLocal)
+
+      val request4 = proto.AnalyzePlanRequest
+        .newBuilder()
+        .setIsStreaming(proto.AnalyzePlanRequest.IsStreaming.newBuilder().setPlan(plan).build())
+        .build()
+      val response4 = handler.process(request4, spark)
+      assert(response4.hasIsStreaming)
+      assert(!response4.getIsStreaming.getIsStreaming)
 
-      assert(response.getTreeString.contains("root"))
-      assert(response.getTreeString.contains("|-- col1: integer (nullable = true)"))
-      assert(response.getTreeString.contains("|-- col2: string (nullable = true)"))
+      val request5 = proto.AnalyzePlanRequest
+        .newBuilder()
+        .setTreeString(proto.AnalyzePlanRequest.TreeString.newBuilder().setPlan(plan).build())
+        .build()
+      val response5 = handler.process(request5, spark)
+      assert(response5.hasTreeString)
+      val treeString = response5.getTreeString.getTreeString
+      assert(treeString.contains("root"))
+      assert(treeString.contains("|-- col1: integer (nullable = true)"))
+      assert(treeString.contains("|-- col2: string (nullable = true)"))
 
-      assert(response.getInputFilesCount === 0)
+      val request6 = proto.AnalyzePlanRequest
+        .newBuilder()
+        .setInputFiles(proto.AnalyzePlanRequest.InputFiles.newBuilder().setPlan(plan).build())
+        .build()
+      val response6 = handler.process(request6, spark)
+      assert(response6.hasInputFiles)
+      assert(response6.getInputFiles.getFilesCount === 0)
     }
   }
 
@@ -200,7 +247,6 @@ class SparkConnectServiceSuite extends SharedSparkSession {
           | CREATE TABLE test (col1 INT, col2 STRING)
           | USING parquet
           |""".stripMargin)
-      val instance = new SparkConnectService(false)
       val relation = proto.Relation
         .newBuilder()
         .setProject(
@@ -225,14 +271,26 @@ class SparkConnectServiceSuite extends SharedSparkSession {
                     proto.Read.NamedTable.newBuilder.setUnparsedIdentifier("test").build()))))
         .build()
 
-      val response =
-        instance
-          .handleAnalyzePlanRequest(relation, spark, ExplainMode.fromString("extended"))
-          .build()
-      assert(response.getExplainString.contains("Parsed Logical Plan"))
-      assert(response.getExplainString.contains("Analyzed Logical Plan"))
-      assert(response.getExplainString.contains("Optimized Logical Plan"))
-      assert(response.getExplainString.contains("Physical Plan"))
+      val plan = proto.Plan.newBuilder().setRoot(relation).build()
+
+      val handler = new SparkConnectAnalyzeHandler(null)
+
+      val request = proto.AnalyzePlanRequest
+        .newBuilder()
+        .setExplain(
+          proto.AnalyzePlanRequest.Explain
+            .newBuilder()
+            .setPlan(plan)
+            .setExplainMode(proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_EXTENDED)
+            .build())
+        .build()
+
+      val response = handler.process(request, spark)
+
+      assert(response.getExplain.getExplainString.contains("Parsed Logical Plan"))
+      assert(response.getExplain.getExplainString.contains("Analyzed Logical Plan"))
+      assert(response.getExplain.getExplainString.contains("Optimized Logical Plan"))
+      assert(response.getExplain.getExplainString.contains("Physical Plan"))
     }
   }
 }
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 91d2f96aee1..8046da409d7 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -376,29 +376,63 @@ class PlanMetrics:
 class AnalyzeResult:
     def __init__(
         self,
-        schema: pb2.DataType,
-        explain: str,
-        tree_string: str,
-        is_local: bool,
-        is_streaming: bool,
-        input_files: List[str],
+        schema: Optional[pb2.DataType],
+        explain_string: Optional[str],
+        tree_string: Optional[str],
+        is_local: Optional[bool],
+        is_streaming: Optional[bool],
+        input_files: Optional[List[str]],
+        spark_version: Optional[str],
+        parsed: Optional[pb2.DataType],
     ):
         self.schema = schema
-        self.explain_string = explain
+        self.explain_string = explain_string
         self.tree_string = tree_string
         self.is_local = is_local
         self.is_streaming = is_streaming
         self.input_files = input_files
+        self.spark_version = spark_version
+        self.parsed = parsed
 
     @classmethod
     def fromProto(cls, pb: Any) -> "AnalyzeResult":
+        schema: Optional[pb2.DataType] = None
+        explain_string: Optional[str] = None
+        tree_string: Optional[str] = None
+        is_local: Optional[bool] = None
+        is_streaming: Optional[bool] = None
+        input_files: Optional[List[str]] = None
+        spark_version: Optional[str] = None
+        parsed: Optional[pb2.DataType] = None
+
+        if pb.HasField("schema"):
+            schema = pb.schema.schema
+        elif pb.HasField("explain"):
+            explain_string = pb.explain.explain_string
+        elif pb.HasField("tree_string"):
+            tree_string = pb.tree_string.tree_string
+        elif pb.HasField("is_local"):
+            is_local = pb.is_local.is_local
+        elif pb.HasField("is_streaming"):
+            is_streaming = pb.is_streaming.is_streaming
+        elif pb.HasField("input_files"):
+            input_files = pb.input_files.files
+        elif pb.HasField("spark_version"):
+            spark_version = pb.spark_version.version
+        elif pb.HasField("ddl_parse"):
+            parsed = pb.ddl_parse.parsed
+        else:
+            raise SparkConnectException("No analyze result found!")
+
         return AnalyzeResult(
-            pb.schema,
-            pb.explain_string,
-            pb.tree_string,
-            pb.is_local,
-            pb.is_streaming,
-            pb.input_files,
+            schema,
+            explain_string,
+            tree_string,
+            is_local,
+            is_streaming,
+            input_files,
+            spark_version,
+            parsed,
         )
 
 
@@ -575,7 +609,8 @@ class SparkConnectClient(object):
         Return schema for given plan.
         """
         logger.info(f"Schema for plan: {self._proto_to_string(plan)}")
-        proto_schema = self._analyze(plan).schema
+        proto_schema = self._analyze(method="schema", plan=plan).schema
+        assert proto_schema is not None
         # Server side should populate the struct field which is the schema.
         assert proto_schema.HasField("struct")
 
@@ -600,8 +635,11 @@ class SparkConnectClient(object):
         Return explain string for given plan.
         """
         logger.info(f"Explain (mode={explain_mode}) for plan {self._proto_to_string(plan)}")
-        result = self._analyze(plan, explain_mode)
-        return result.explain_string
+        result = self._analyze(
+            method="explain", plan=plan, explain_mode=explain_mode
+        ).explain_string
+        assert result is not None
+        return result
 
     def execute_command(self, command: pb2.Command) -> None:
         """
@@ -637,40 +675,62 @@ class SparkConnectClient(object):
             req.user_context.user_id = self._user_id
         return req
 
-    def _analyze(self, plan: pb2.Plan, explain_mode: str = "extended") -> AnalyzeResult:
+    def _analyze(self, method: str, **kwargs: Any) -> AnalyzeResult:
         """
         Call the analyze RPC of Spark Connect.
 
-        Parameters
-        ----------
-        plan : :class:`pyspark.sql.connect.proto.Plan`
-           Proto representation of the plan.
-        explain_mode : str
-           Explain mode
-
         Returns
         -------
         The result of the analyze call.
         """
         req = self._analyze_plan_request_with_metadata()
-        req.plan.CopyFrom(plan)
-        if explain_mode not in ["simple", "extended", "codegen", "cost", "formatted"]:
-            raise ValueError(
-                f"""
-                Unknown explain mode: {explain_mode}. Accepted "
-                "explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'."
-                """
-            )
-        if explain_mode == "simple":
-            req.explain.explain_mode = pb2.Explain.ExplainMode.SIMPLE
-        elif explain_mode == "extended":
-            req.explain.explain_mode = pb2.Explain.ExplainMode.EXTENDED
-        elif explain_mode == "cost":
-            req.explain.explain_mode = pb2.Explain.ExplainMode.COST
-        elif explain_mode == "codegen":
-            req.explain.explain_mode = pb2.Explain.ExplainMode.CODEGEN
-        else:  # formatted
-            req.explain.explain_mode = pb2.Explain.ExplainMode.FORMATTED
+        if method == "schema":
+            req.schema.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+        elif method == "explain":
+            req.explain.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+            explain_mode = kwargs.get("explain_mode")
+            if explain_mode not in ["simple", "extended", "codegen", "cost", "formatted"]:
+                raise ValueError(
+                    f"""
+                    Unknown explain mode: {explain_mode}. Accepted "
+                    "explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'."
+                    """
+                )
+            if explain_mode == "simple":
+                req.explain.explain_mode = (
+                    pb2.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_SIMPLE
+                )
+            elif explain_mode == "extended":
+                req.explain.explain_mode = (
+                    pb2.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_EXTENDED
+                )
+            elif explain_mode == "cost":
+                req.explain.explain_mode = (
+                    pb2.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_COST
+                )
+            elif explain_mode == "codegen":
+                req.explain.explain_mode = (
+                    pb2.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_CODEGEN
+                )
+            else:  # formatted
+                req.explain.explain_mode = (
+                    pb2.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_FORMATTED
+                )
+        elif method == "tree_string":
+            req.tree_string.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+        elif method == "is_local":
+            req.is_local.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+        elif method == "is_streaming":
+            req.is_streaming.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+        elif method == "input_files":
+            req.input_files.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+        elif method == "spark_version":
+            req.spark_version.SetInParent()
+        elif method == "ddl_parse":
+            req.ddl_parse.ddl_string = cast(str, kwargs.get("ddl_string"))
+        else:
+            raise ValueError(f"Unknown Analyze method: {method}")
+
         try:
             for attempt in Retrying(
                 can_retry=SparkConnectClient.retry_exception, **self._retry_policy
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index 0d501b0bc4d..955186787a6 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1356,7 +1356,9 @@ class DataFrame:
         if self._plan is None:
             raise Exception("Cannot analyze on empty plan.")
         query = self._plan.to_proto(self._session.client)
-        return self._session.client._analyze(query).is_local
+        result = self._session.client._analyze(method="is_local", plan=query).is_local
+        assert result is not None
+        return result
 
     isLocal.__doc__ = PySparkDataFrame.isLocal.__doc__
 
@@ -1365,7 +1367,9 @@ class DataFrame:
         if self._plan is None:
             raise Exception("Cannot analyze on empty plan.")
         query = self._plan.to_proto(self._session.client)
-        return self._session.client._analyze(query).is_streaming
+        result = self._session.client._analyze(method="is_streaming", plan=query).is_streaming
+        assert result is not None
+        return result
 
     isStreaming.__doc__ = PySparkDataFrame.isStreaming.__doc__
 
@@ -1373,7 +1377,9 @@ class DataFrame:
         if self._plan is None:
             raise Exception("Cannot analyze on empty plan.")
         query = self._plan.to_proto(self._session.client)
-        return self._session.client._analyze(query).tree_string
+        result = self._session.client._analyze(method="tree_string", plan=query).tree_string
+        assert result is not None
+        return result
 
     def printSchema(self) -> None:
         print(self._tree_string())
@@ -1384,7 +1390,9 @@ class DataFrame:
         if self._plan is None:
             raise Exception("Cannot analyze on empty plan.")
         query = self._plan.to_proto(self._session.client)
-        return self._session.client._analyze(query).input_files
+        result = self._session.client._analyze(method="input_files", plan=query).input_files
+        assert result is not None
+        return result
 
     inputFiles.__doc__ = PySparkDataFrame.inputFiles.__doc__
 
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 95951d8f8e3..576e4fd8ab3 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -36,15 +36,30 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain. [...]
+    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x0 [...]
 )
 
 
 _PLAN = DESCRIPTOR.message_types_by_name["Plan"]
-_EXPLAIN = DESCRIPTOR.message_types_by_name["Explain"]
 _USERCONTEXT = DESCRIPTOR.message_types_by_name["UserContext"]
 _ANALYZEPLANREQUEST = DESCRIPTOR.message_types_by_name["AnalyzePlanRequest"]
+_ANALYZEPLANREQUEST_SCHEMA = _ANALYZEPLANREQUEST.nested_types_by_name["Schema"]
+_ANALYZEPLANREQUEST_EXPLAIN = _ANALYZEPLANREQUEST.nested_types_by_name["Explain"]
+_ANALYZEPLANREQUEST_TREESTRING = _ANALYZEPLANREQUEST.nested_types_by_name["TreeString"]
+_ANALYZEPLANREQUEST_ISLOCAL = _ANALYZEPLANREQUEST.nested_types_by_name["IsLocal"]
+_ANALYZEPLANREQUEST_ISSTREAMING = _ANALYZEPLANREQUEST.nested_types_by_name["IsStreaming"]
+_ANALYZEPLANREQUEST_INPUTFILES = _ANALYZEPLANREQUEST.nested_types_by_name["InputFiles"]
+_ANALYZEPLANREQUEST_SPARKVERSION = _ANALYZEPLANREQUEST.nested_types_by_name["SparkVersion"]
+_ANALYZEPLANREQUEST_DDLPARSE = _ANALYZEPLANREQUEST.nested_types_by_name["DDLParse"]
 _ANALYZEPLANRESPONSE = DESCRIPTOR.message_types_by_name["AnalyzePlanResponse"]
+_ANALYZEPLANRESPONSE_SCHEMA = _ANALYZEPLANRESPONSE.nested_types_by_name["Schema"]
+_ANALYZEPLANRESPONSE_EXPLAIN = _ANALYZEPLANRESPONSE.nested_types_by_name["Explain"]
+_ANALYZEPLANRESPONSE_TREESTRING = _ANALYZEPLANRESPONSE.nested_types_by_name["TreeString"]
+_ANALYZEPLANRESPONSE_ISLOCAL = _ANALYZEPLANRESPONSE.nested_types_by_name["IsLocal"]
+_ANALYZEPLANRESPONSE_ISSTREAMING = _ANALYZEPLANRESPONSE.nested_types_by_name["IsStreaming"]
+_ANALYZEPLANRESPONSE_INPUTFILES = _ANALYZEPLANRESPONSE.nested_types_by_name["InputFiles"]
+_ANALYZEPLANRESPONSE_SPARKVERSION = _ANALYZEPLANRESPONSE.nested_types_by_name["SparkVersion"]
+_ANALYZEPLANRESPONSE_DDLPARSE = _ANALYZEPLANRESPONSE.nested_types_by_name["DDLParse"]
 _EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"]
 _EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"]
 _EXECUTEPLANRESPONSE_ARROWBATCH = _EXECUTEPLANRESPONSE.nested_types_by_name["ArrowBatch"]
@@ -69,7 +84,9 @@ _CONFIGREQUEST_GETALL = _CONFIGREQUEST.nested_types_by_name["GetAll"]
 _CONFIGREQUEST_UNSET = _CONFIGREQUEST.nested_types_by_name["Unset"]
 _CONFIGREQUEST_ISMODIFIABLE = _CONFIGREQUEST.nested_types_by_name["IsModifiable"]
 _CONFIGRESPONSE = DESCRIPTOR.message_types_by_name["ConfigResponse"]
-_EXPLAIN_EXPLAINMODE = _EXPLAIN.enum_types_by_name["ExplainMode"]
+_ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE = _ANALYZEPLANREQUEST_EXPLAIN.enum_types_by_name[
+    "ExplainMode"
+]
 Plan = _reflection.GeneratedProtocolMessageType(
     "Plan",
     (_message.Message,),
@@ -81,17 +98,6 @@ Plan = _reflection.GeneratedProtocolMessageType(
 )
 _sym_db.RegisterMessage(Plan)
 
-Explain = _reflection.GeneratedProtocolMessageType(
-    "Explain",
-    (_message.Message,),
-    {
-        "DESCRIPTOR": _EXPLAIN,
-        "__module__": "spark.connect.base_pb2"
-        # @@protoc_insertion_point(class_scope:spark.connect.Explain)
-    },
-)
-_sym_db.RegisterMessage(Explain)
-
 UserContext = _reflection.GeneratedProtocolMessageType(
     "UserContext",
     (_message.Message,),
@@ -107,23 +113,183 @@ AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType(
     "AnalyzePlanRequest",
     (_message.Message,),
     {
+        "Schema": _reflection.GeneratedProtocolMessageType(
+            "Schema",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_SCHEMA,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.Schema)
+            },
+        ),
+        "Explain": _reflection.GeneratedProtocolMessageType(
+            "Explain",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_EXPLAIN,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.Explain)
+            },
+        ),
+        "TreeString": _reflection.GeneratedProtocolMessageType(
+            "TreeString",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_TREESTRING,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.TreeString)
+            },
+        ),
+        "IsLocal": _reflection.GeneratedProtocolMessageType(
+            "IsLocal",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_ISLOCAL,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.IsLocal)
+            },
+        ),
+        "IsStreaming": _reflection.GeneratedProtocolMessageType(
+            "IsStreaming",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_ISSTREAMING,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.IsStreaming)
+            },
+        ),
+        "InputFiles": _reflection.GeneratedProtocolMessageType(
+            "InputFiles",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_INPUTFILES,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.InputFiles)
+            },
+        ),
+        "SparkVersion": _reflection.GeneratedProtocolMessageType(
+            "SparkVersion",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_SPARKVERSION,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.SparkVersion)
+            },
+        ),
+        "DDLParse": _reflection.GeneratedProtocolMessageType(
+            "DDLParse",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_DDLPARSE,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.DDLParse)
+            },
+        ),
         "DESCRIPTOR": _ANALYZEPLANREQUEST,
         "__module__": "spark.connect.base_pb2"
         # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest)
     },
 )
 _sym_db.RegisterMessage(AnalyzePlanRequest)
+_sym_db.RegisterMessage(AnalyzePlanRequest.Schema)
+_sym_db.RegisterMessage(AnalyzePlanRequest.Explain)
+_sym_db.RegisterMessage(AnalyzePlanRequest.TreeString)
+_sym_db.RegisterMessage(AnalyzePlanRequest.IsLocal)
+_sym_db.RegisterMessage(AnalyzePlanRequest.IsStreaming)
+_sym_db.RegisterMessage(AnalyzePlanRequest.InputFiles)
+_sym_db.RegisterMessage(AnalyzePlanRequest.SparkVersion)
+_sym_db.RegisterMessage(AnalyzePlanRequest.DDLParse)
 
 AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
     "AnalyzePlanResponse",
     (_message.Message,),
     {
+        "Schema": _reflection.GeneratedProtocolMessageType(
+            "Schema",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_SCHEMA,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.Schema)
+            },
+        ),
+        "Explain": _reflection.GeneratedProtocolMessageType(
+            "Explain",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_EXPLAIN,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.Explain)
+            },
+        ),
+        "TreeString": _reflection.GeneratedProtocolMessageType(
+            "TreeString",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_TREESTRING,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.TreeString)
+            },
+        ),
+        "IsLocal": _reflection.GeneratedProtocolMessageType(
+            "IsLocal",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_ISLOCAL,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.IsLocal)
+            },
+        ),
+        "IsStreaming": _reflection.GeneratedProtocolMessageType(
+            "IsStreaming",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_ISSTREAMING,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.IsStreaming)
+            },
+        ),
+        "InputFiles": _reflection.GeneratedProtocolMessageType(
+            "InputFiles",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_INPUTFILES,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.InputFiles)
+            },
+        ),
+        "SparkVersion": _reflection.GeneratedProtocolMessageType(
+            "SparkVersion",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_SPARKVERSION,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.SparkVersion)
+            },
+        ),
+        "DDLParse": _reflection.GeneratedProtocolMessageType(
+            "DDLParse",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_DDLPARSE,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.DDLParse)
+            },
+        ),
         "DESCRIPTOR": _ANALYZEPLANRESPONSE,
         "__module__": "spark.connect.base_pb2"
         # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse)
     },
 )
 _sym_db.RegisterMessage(AnalyzePlanResponse)
+_sym_db.RegisterMessage(AnalyzePlanResponse.Schema)
+_sym_db.RegisterMessage(AnalyzePlanResponse.Explain)
+_sym_db.RegisterMessage(AnalyzePlanResponse.TreeString)
+_sym_db.RegisterMessage(AnalyzePlanResponse.IsLocal)
+_sym_db.RegisterMessage(AnalyzePlanResponse.IsStreaming)
+_sym_db.RegisterMessage(AnalyzePlanResponse.InputFiles)
+_sym_db.RegisterMessage(AnalyzePlanResponse.SparkVersion)
+_sym_db.RegisterMessage(AnalyzePlanResponse.DDLParse)
 
 ExecutePlanRequest = _reflection.GeneratedProtocolMessageType(
     "ExecutePlanRequest",
@@ -319,52 +485,82 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_options = b"8\001"
     _PLAN._serialized_start = 158
     _PLAN._serialized_end = 274
-    _EXPLAIN._serialized_start = 277
-    _EXPLAIN._serialized_end = 458
-    _EXPLAIN_EXPLAINMODE._serialized_start = 359
-    _EXPLAIN_EXPLAINMODE._serialized_end = 458
-    _USERCONTEXT._serialized_start = 460
-    _USERCONTEXT._serialized_end = 582
-    _ANALYZEPLANREQUEST._serialized_start = 585
-    _ANALYZEPLANREQUEST._serialized_end = 842
-    _ANALYZEPLANRESPONSE._serialized_start = 845
-    _ANALYZEPLANRESPONSE._serialized_end = 1111
-    _EXECUTEPLANREQUEST._serialized_start = 1114
-    _EXECUTEPLANREQUEST._serialized_end = 1321
-    _EXECUTEPLANRESPONSE._serialized_start = 1324
-    _EXECUTEPLANRESPONSE._serialized_end = 2107
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 1526
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 1587
-    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 1590
-    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 2107
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 1685
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 2017
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1894
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 2017
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 2019
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 2107
-    _KEYVALUE._serialized_start = 2109
-    _KEYVALUE._serialized_end = 2174
-    _CONFIGREQUEST._serialized_start = 2177
-    _CONFIGREQUEST._serialized_end = 3203
-    _CONFIGREQUEST_OPERATION._serialized_start = 2395
-    _CONFIGREQUEST_OPERATION._serialized_end = 2893
-    _CONFIGREQUEST_SET._serialized_start = 2895
-    _CONFIGREQUEST_SET._serialized_end = 2947
-    _CONFIGREQUEST_GET._serialized_start = 2949
-    _CONFIGREQUEST_GET._serialized_end = 2974
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 2976
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 3039
-    _CONFIGREQUEST_GETOPTION._serialized_start = 3041
-    _CONFIGREQUEST_GETOPTION._serialized_end = 3072
-    _CONFIGREQUEST_GETALL._serialized_start = 3074
-    _CONFIGREQUEST_GETALL._serialized_end = 3122
-    _CONFIGREQUEST_UNSET._serialized_start = 3124
-    _CONFIGREQUEST_UNSET._serialized_end = 3151
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 3153
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 3187
-    _CONFIGRESPONSE._serialized_start = 3205
-    _CONFIGRESPONSE._serialized_end = 3325
-    _SPARKCONNECTSERVICE._serialized_start = 3328
-    _SPARKCONNECTSERVICE._serialized_end = 3600
+    _USERCONTEXT._serialized_start = 276
+    _USERCONTEXT._serialized_end = 398
+    _ANALYZEPLANREQUEST._serialized_start = 401
+    _ANALYZEPLANREQUEST._serialized_end = 1843
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1172
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1221
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1224
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1539
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1367
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1539
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1541
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1594
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1596
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1646
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1648
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1702
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1704
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1757
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1759
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1773
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1775
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1816
+    _ANALYZEPLANRESPONSE._serialized_start = 1846
+    _ANALYZEPLANRESPONSE._serialized_end = 2916
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2525
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2582
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2584
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2632
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2634
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 2679
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 2681
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 2717
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 2719
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 2767
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 2769
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 2803
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 2805
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 2845
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 2847
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 2906
+    _EXECUTEPLANREQUEST._serialized_start = 2919
+    _EXECUTEPLANREQUEST._serialized_end = 3126
+    _EXECUTEPLANRESPONSE._serialized_start = 3129
+    _EXECUTEPLANRESPONSE._serialized_end = 3912
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 3331
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 3392
+    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 3395
+    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 3912
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 3490
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 3822
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 3699
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 3822
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 3824
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 3912
+    _KEYVALUE._serialized_start = 3914
+    _KEYVALUE._serialized_end = 3979
+    _CONFIGREQUEST._serialized_start = 3982
+    _CONFIGREQUEST._serialized_end = 5008
+    _CONFIGREQUEST_OPERATION._serialized_start = 4200
+    _CONFIGREQUEST_OPERATION._serialized_end = 4698
+    _CONFIGREQUEST_SET._serialized_start = 4700
+    _CONFIGREQUEST_SET._serialized_end = 4752
+    _CONFIGREQUEST_GET._serialized_start = 4754
+    _CONFIGREQUEST_GET._serialized_end = 4779
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 4781
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 4844
+    _CONFIGREQUEST_GETOPTION._serialized_start = 4846
+    _CONFIGREQUEST_GETOPTION._serialized_end = 4877
+    _CONFIGREQUEST_GETALL._serialized_start = 4879
+    _CONFIGREQUEST_GETALL._serialized_end = 4927
+    _CONFIGREQUEST_UNSET._serialized_start = 4929
+    _CONFIGREQUEST_UNSET._serialized_end = 4956
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 4958
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 4992
+    _CONFIGRESPONSE._serialized_start = 5010
+    _CONFIGRESPONSE._serialized_end = 5130
+    _SPARKCONNECTSERVICE._serialized_start = 5133
+    _SPARKCONNECTSERVICE._serialized_end = 5405
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index f6c402b229f..cb5606e1acd 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -92,71 +92,6 @@ class Plan(google.protobuf.message.Message):
 
 global___Plan = Plan
 
-class Explain(google.protobuf.message.Message):
-    """Explains the input plan based on a configurable mode."""
-
-    DESCRIPTOR: google.protobuf.descriptor.Descriptor
-
-    class _ExplainMode:
-        ValueType = typing.NewType("ValueType", builtins.int)
-        V: typing_extensions.TypeAlias = ValueType
-
-    class _ExplainModeEnumTypeWrapper(
-        google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Explain._ExplainMode.ValueType],
-        builtins.type,
-    ):  # noqa: F821
-        DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
-        MODE_UNSPECIFIED: Explain._ExplainMode.ValueType  # 0
-        SIMPLE: Explain._ExplainMode.ValueType  # 1
-        """Generates only physical plan."""
-        EXTENDED: Explain._ExplainMode.ValueType  # 2
-        """Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
-        Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
-        transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
-        The optimized logical plan transforms through a set of optimization rules, resulting in the
-        physical plan.
-        """
-        CODEGEN: Explain._ExplainMode.ValueType  # 3
-        """Generates code for the statement, if any and a physical plan."""
-        COST: Explain._ExplainMode.ValueType  # 4
-        """If plan node statistics are available, generates a logical plan and also the statistics."""
-        FORMATTED: Explain._ExplainMode.ValueType  # 5
-        """Generates a physical plan outline and also node details."""
-
-    class ExplainMode(_ExplainMode, metaclass=_ExplainModeEnumTypeWrapper):
-        """Plan explanation mode."""
-
-    MODE_UNSPECIFIED: Explain.ExplainMode.ValueType  # 0
-    SIMPLE: Explain.ExplainMode.ValueType  # 1
-    """Generates only physical plan."""
-    EXTENDED: Explain.ExplainMode.ValueType  # 2
-    """Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
-    Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
-    transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
-    The optimized logical plan transforms through a set of optimization rules, resulting in the
-    physical plan.
-    """
-    CODEGEN: Explain.ExplainMode.ValueType  # 3
-    """Generates code for the statement, if any and a physical plan."""
-    COST: Explain.ExplainMode.ValueType  # 4
-    """If plan node statistics are available, generates a logical plan and also the statistics."""
-    FORMATTED: Explain.ExplainMode.ValueType  # 5
-    """Generates a physical plan outline and also node details."""
-
-    EXPLAIN_MODE_FIELD_NUMBER: builtins.int
-    explain_mode: global___Explain.ExplainMode.ValueType
-    """(Required) For analyzePlan rpc calls, configure the mode to explain plan in strings."""
-    def __init__(
-        self,
-        *,
-        explain_mode: global___Explain.ExplainMode.ValueType = ...,
-    ) -> None: ...
-    def ClearField(
-        self, field_name: typing_extensions.Literal["explain_mode", b"explain_mode"]
-    ) -> None: ...
-
-global___Explain = Explain
-
 class UserContext(google.protobuf.message.Message):
     """User Context is used to refer to one particular user session that is executing
     queries in the backend.
@@ -201,11 +136,198 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
 
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
+    class Schema(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        PLAN_FIELD_NUMBER: builtins.int
+        @property
+        def plan(self) -> global___Plan:
+            """(Required) The logical plan to be analyzed."""
+        def __init__(
+            self,
+            *,
+            plan: global___Plan | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["plan", b"plan"]
+        ) -> builtins.bool: ...
+        def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ...
+
+    class Explain(google.protobuf.message.Message):
+        """Explains the input plan based on a configurable mode."""
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        class _ExplainMode:
+            ValueType = typing.NewType("ValueType", builtins.int)
+            V: typing_extensions.TypeAlias = ValueType
+
+        class _ExplainModeEnumTypeWrapper(
+            google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[
+                AnalyzePlanRequest.Explain._ExplainMode.ValueType
+            ],
+            builtins.type,
+        ):  # noqa: F821
+            DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
+            EXPLAIN_MODE_UNSPECIFIED: AnalyzePlanRequest.Explain._ExplainMode.ValueType  # 0
+            EXPLAIN_MODE_SIMPLE: AnalyzePlanRequest.Explain._ExplainMode.ValueType  # 1
+            """Generates only physical plan."""
+            EXPLAIN_MODE_EXTENDED: AnalyzePlanRequest.Explain._ExplainMode.ValueType  # 2
+            """Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+            Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+            transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+            The optimized logical plan transforms through a set of optimization rules, resulting in the
+            physical plan.
+            """
+            EXPLAIN_MODE_CODEGEN: AnalyzePlanRequest.Explain._ExplainMode.ValueType  # 3
+            """Generates code for the statement, if any and a physical plan."""
+            EXPLAIN_MODE_COST: AnalyzePlanRequest.Explain._ExplainMode.ValueType  # 4
+            """If plan node statistics are available, generates a logical plan and also the statistics."""
+            EXPLAIN_MODE_FORMATTED: AnalyzePlanRequest.Explain._ExplainMode.ValueType  # 5
+            """Generates a physical plan outline and also node details."""
+
+        class ExplainMode(_ExplainMode, metaclass=_ExplainModeEnumTypeWrapper):
+            """Plan explanation mode."""
+
+        EXPLAIN_MODE_UNSPECIFIED: AnalyzePlanRequest.Explain.ExplainMode.ValueType  # 0
+        EXPLAIN_MODE_SIMPLE: AnalyzePlanRequest.Explain.ExplainMode.ValueType  # 1
+        """Generates only physical plan."""
+        EXPLAIN_MODE_EXTENDED: AnalyzePlanRequest.Explain.ExplainMode.ValueType  # 2
+        """Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+        Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+        transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+        The optimized logical plan transforms through a set of optimization rules, resulting in the
+        physical plan.
+        """
+        EXPLAIN_MODE_CODEGEN: AnalyzePlanRequest.Explain.ExplainMode.ValueType  # 3
+        """Generates code for the statement, if any and a physical plan."""
+        EXPLAIN_MODE_COST: AnalyzePlanRequest.Explain.ExplainMode.ValueType  # 4
+        """If plan node statistics are available, generates a logical plan and also the statistics."""
+        EXPLAIN_MODE_FORMATTED: AnalyzePlanRequest.Explain.ExplainMode.ValueType  # 5
+        """Generates a physical plan outline and also node details."""
+
+        PLAN_FIELD_NUMBER: builtins.int
+        EXPLAIN_MODE_FIELD_NUMBER: builtins.int
+        @property
+        def plan(self) -> global___Plan:
+            """(Required) The logical plan to be analyzed."""
+        explain_mode: global___AnalyzePlanRequest.Explain.ExplainMode.ValueType
+        """(Required) For analyzePlan rpc calls, configure the mode to explain plan in strings."""
+        def __init__(
+            self,
+            *,
+            plan: global___Plan | None = ...,
+            explain_mode: global___AnalyzePlanRequest.Explain.ExplainMode.ValueType = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["plan", b"plan"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self,
+            field_name: typing_extensions.Literal["explain_mode", b"explain_mode", "plan", b"plan"],
+        ) -> None: ...
+
+    class TreeString(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        PLAN_FIELD_NUMBER: builtins.int
+        @property
+        def plan(self) -> global___Plan:
+            """(Required) The logical plan to be analyzed."""
+        def __init__(
+            self,
+            *,
+            plan: global___Plan | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["plan", b"plan"]
+        ) -> builtins.bool: ...
+        def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ...
+
+    class IsLocal(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        PLAN_FIELD_NUMBER: builtins.int
+        @property
+        def plan(self) -> global___Plan:
+            """(Required) The logical plan to be analyzed."""
+        def __init__(
+            self,
+            *,
+            plan: global___Plan | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["plan", b"plan"]
+        ) -> builtins.bool: ...
+        def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ...
+
+    class IsStreaming(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        PLAN_FIELD_NUMBER: builtins.int
+        @property
+        def plan(self) -> global___Plan:
+            """(Required) The logical plan to be analyzed."""
+        def __init__(
+            self,
+            *,
+            plan: global___Plan | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["plan", b"plan"]
+        ) -> builtins.bool: ...
+        def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ...
+
+    class InputFiles(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        PLAN_FIELD_NUMBER: builtins.int
+        @property
+        def plan(self) -> global___Plan:
+            """(Required) The logical plan to be analyzed."""
+        def __init__(
+            self,
+            *,
+            plan: global___Plan | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["plan", b"plan"]
+        ) -> builtins.bool: ...
+        def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ...
+
+    class SparkVersion(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        def __init__(
+            self,
+        ) -> None: ...
+
+    class DDLParse(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        DDL_STRING_FIELD_NUMBER: builtins.int
+        ddl_string: builtins.str
+        """(Required) The DDL formatted string to be parsed."""
+        def __init__(
+            self,
+            *,
+            ddl_string: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["ddl_string", b"ddl_string"]
+        ) -> None: ...
+
     CLIENT_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
-    PLAN_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
+    SCHEMA_FIELD_NUMBER: builtins.int
     EXPLAIN_FIELD_NUMBER: builtins.int
+    TREE_STRING_FIELD_NUMBER: builtins.int
+    IS_LOCAL_FIELD_NUMBER: builtins.int
+    IS_STREAMING_FIELD_NUMBER: builtins.int
+    INPUT_FILES_FIELD_NUMBER: builtins.int
+    SPARK_VERSION_FIELD_NUMBER: builtins.int
+    DDL_PARSE_FIELD_NUMBER: builtins.int
     client_id: builtins.str
     """(Required)
 
@@ -215,37 +337,67 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     @property
     def user_context(self) -> global___UserContext:
         """(Required) User context"""
-    @property
-    def plan(self) -> global___Plan:
-        """(Required) The logical plan to be analyzed."""
     client_type: builtins.str
     """Provides optional information about the client sending the request. This field
     can be used for language or version specific information and is only intended for
     logging purposes and will not be interpreted by the server.
     """
     @property
-    def explain(self) -> global___Explain:
-        """(Optional) Get the explain string of the plan."""
+    def schema(self) -> global___AnalyzePlanRequest.Schema: ...
+    @property
+    def explain(self) -> global___AnalyzePlanRequest.Explain: ...
+    @property
+    def tree_string(self) -> global___AnalyzePlanRequest.TreeString: ...
+    @property
+    def is_local(self) -> global___AnalyzePlanRequest.IsLocal: ...
+    @property
+    def is_streaming(self) -> global___AnalyzePlanRequest.IsStreaming: ...
+    @property
+    def input_files(self) -> global___AnalyzePlanRequest.InputFiles: ...
+    @property
+    def spark_version(self) -> global___AnalyzePlanRequest.SparkVersion: ...
+    @property
+    def ddl_parse(self) -> global___AnalyzePlanRequest.DDLParse: ...
     def __init__(
         self,
         *,
         client_id: builtins.str = ...,
         user_context: global___UserContext | None = ...,
-        plan: global___Plan | None = ...,
         client_type: builtins.str | None = ...,
-        explain: global___Explain | None = ...,
+        schema: global___AnalyzePlanRequest.Schema | None = ...,
+        explain: global___AnalyzePlanRequest.Explain | None = ...,
+        tree_string: global___AnalyzePlanRequest.TreeString | None = ...,
+        is_local: global___AnalyzePlanRequest.IsLocal | None = ...,
+        is_streaming: global___AnalyzePlanRequest.IsStreaming | None = ...,
+        input_files: global___AnalyzePlanRequest.InputFiles | None = ...,
+        spark_version: global___AnalyzePlanRequest.SparkVersion | None = ...,
+        ddl_parse: global___AnalyzePlanRequest.DDLParse | None = ...,
     ) -> None: ...
     def HasField(
         self,
         field_name: typing_extensions.Literal[
             "_client_type",
             b"_client_type",
+            "analyze",
+            b"analyze",
             "client_type",
             b"client_type",
+            "ddl_parse",
+            b"ddl_parse",
             "explain",
             b"explain",
-            "plan",
-            b"plan",
+            "input_files",
+            b"input_files",
+            "is_local",
+            b"is_local",
+            "is_streaming",
+            b"is_streaming",
+            "schema",
+            b"schema",
+            "spark_version",
+            b"spark_version",
+            "tree_string",
+            b"tree_string",
             "user_context",
             b"user_context",
         ],
@@ -255,21 +407,49 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "_client_type",
             b"_client_type",
+            "analyze",
+            b"analyze",
             "client_id",
             b"client_id",
             "client_type",
             b"client_type",
+            "ddl_parse",
+            b"ddl_parse",
             "explain",
             b"explain",
-            "plan",
-            b"plan",
+            "input_files",
+            b"input_files",
+            "is_local",
+            b"is_local",
+            "is_streaming",
+            b"is_streaming",
+            "schema",
+            b"schema",
+            "spark_version",
+            b"spark_version",
+            "tree_string",
+            b"tree_string",
             "user_context",
             b"user_context",
         ],
     ) -> None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["analyze", b"analyze"]
+    ) -> typing_extensions.Literal[
+        "schema",
+        "explain",
+        "tree_string",
+        "is_local",
+        "is_streaming",
+        "input_files",
+        "spark_version",
+        "ddl_parse",
+    ] | None: ...
 
 global___AnalyzePlanRequest = AnalyzePlanRequest
 
@@ -280,64 +460,227 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
 
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
+    class Schema(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        SCHEMA_FIELD_NUMBER: builtins.int
+        @property
+        def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ...
+        def __init__(
+            self,
+            *,
+            schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["schema", b"schema"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["schema", b"schema"]
+        ) -> None: ...
+
+    class Explain(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        EXPLAIN_STRING_FIELD_NUMBER: builtins.int
+        explain_string: builtins.str
+        def __init__(
+            self,
+            *,
+            explain_string: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["explain_string", b"explain_string"]
+        ) -> None: ...
+
+    class TreeString(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        TREE_STRING_FIELD_NUMBER: builtins.int
+        tree_string: builtins.str
+        def __init__(
+            self,
+            *,
+            tree_string: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["tree_string", b"tree_string"]
+        ) -> None: ...
+
+    class IsLocal(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        IS_LOCAL_FIELD_NUMBER: builtins.int
+        is_local: builtins.bool
+        def __init__(
+            self,
+            *,
+            is_local: builtins.bool = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["is_local", b"is_local"]
+        ) -> None: ...
+
+    class IsStreaming(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        IS_STREAMING_FIELD_NUMBER: builtins.int
+        is_streaming: builtins.bool
+        def __init__(
+            self,
+            *,
+            is_streaming: builtins.bool = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["is_streaming", b"is_streaming"]
+        ) -> None: ...
+
+    class InputFiles(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        FILES_FIELD_NUMBER: builtins.int
+        @property
+        def files(
+            self,
+        ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+            """A best-effort snapshot of the files that compose this Dataset"""
+        def __init__(
+            self,
+            *,
+            files: collections.abc.Iterable[builtins.str] | None = ...,
+        ) -> None: ...
+        def ClearField(self, field_name: typing_extensions.Literal["files", b"files"]) -> None: ...
+
+    class SparkVersion(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        VERSION_FIELD_NUMBER: builtins.int
+        version: builtins.str
+        def __init__(
+            self,
+            *,
+            version: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["version", b"version"]
+        ) -> None: ...
+
+    class DDLParse(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        PARSED_FIELD_NUMBER: builtins.int
+        @property
+        def parsed(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ...
+        def __init__(
+            self,
+            *,
+            parsed: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["parsed", b"parsed"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["parsed", b"parsed"]
+        ) -> None: ...
+
     CLIENT_ID_FIELD_NUMBER: builtins.int
     SCHEMA_FIELD_NUMBER: builtins.int
-    EXPLAIN_STRING_FIELD_NUMBER: builtins.int
+    EXPLAIN_FIELD_NUMBER: builtins.int
     TREE_STRING_FIELD_NUMBER: builtins.int
     IS_LOCAL_FIELD_NUMBER: builtins.int
     IS_STREAMING_FIELD_NUMBER: builtins.int
     INPUT_FILES_FIELD_NUMBER: builtins.int
+    SPARK_VERSION_FIELD_NUMBER: builtins.int
+    DDL_PARSE_FIELD_NUMBER: builtins.int
     client_id: builtins.str
     @property
-    def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ...
-    explain_string: builtins.str
-    """The extended explain string as produced by Spark."""
-    tree_string: builtins.str
-    """Get the tree string of the schema."""
-    is_local: builtins.bool
-    """Whether the 'collect' and 'take' methods can be run locally."""
-    is_streaming: builtins.bool
-    """Whether this plan contains one or more sources that continuously
-    return data as it arrives.
-    """
+    def schema(self) -> global___AnalyzePlanResponse.Schema: ...
     @property
-    def input_files(
-        self,
-    ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
-        """A best-effort snapshot of the files that compose this Dataset"""
+    def explain(self) -> global___AnalyzePlanResponse.Explain: ...
+    @property
+    def tree_string(self) -> global___AnalyzePlanResponse.TreeString: ...
+    @property
+    def is_local(self) -> global___AnalyzePlanResponse.IsLocal: ...
+    @property
+    def is_streaming(self) -> global___AnalyzePlanResponse.IsStreaming: ...
+    @property
+    def input_files(self) -> global___AnalyzePlanResponse.InputFiles: ...
+    @property
+    def spark_version(self) -> global___AnalyzePlanResponse.SparkVersion: ...
+    @property
+    def ddl_parse(self) -> global___AnalyzePlanResponse.DDLParse: ...
     def __init__(
         self,
         *,
         client_id: builtins.str = ...,
-        schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
-        explain_string: builtins.str = ...,
-        tree_string: builtins.str = ...,
-        is_local: builtins.bool = ...,
-        is_streaming: builtins.bool = ...,
-        input_files: collections.abc.Iterable[builtins.str] | None = ...,
+        schema: global___AnalyzePlanResponse.Schema | None = ...,
+        explain: global___AnalyzePlanResponse.Explain | None = ...,
+        tree_string: global___AnalyzePlanResponse.TreeString | None = ...,
+        is_local: global___AnalyzePlanResponse.IsLocal | None = ...,
+        is_streaming: global___AnalyzePlanResponse.IsStreaming | None = ...,
+        input_files: global___AnalyzePlanResponse.InputFiles | None = ...,
+        spark_version: global___AnalyzePlanResponse.SparkVersion | None = ...,
+        ddl_parse: global___AnalyzePlanResponse.DDLParse | None = ...,
     ) -> None: ...
     def HasField(
-        self, field_name: typing_extensions.Literal["schema", b"schema"]
+        self,
+        field_name: typing_extensions.Literal[
+            "ddl_parse",
+            b"ddl_parse",
+            "explain",
+            b"explain",
+            "input_files",
+            b"input_files",
+            "is_local",
+            b"is_local",
+            "is_streaming",
+            b"is_streaming",
+            "result",
+            b"result",
+            "schema",
+            b"schema",
+            "spark_version",
+            b"spark_version",
+            "tree_string",
+            b"tree_string",
+        ],
     ) -> builtins.bool: ...
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
             "client_id",
             b"client_id",
-            "explain_string",
-            b"explain_string",
+            "ddl_parse",
+            b"ddl_parse",
+            "explain",
+            b"explain",
             "input_files",
             b"input_files",
             "is_local",
             b"is_local",
             "is_streaming",
             b"is_streaming",
+            "result",
+            b"result",
             "schema",
             b"schema",
+            "spark_version",
+            b"spark_version",
             "tree_string",
             b"tree_string",
         ],
     ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["result", b"result"]
+    ) -> typing_extensions.Literal[
+        "schema",
+        "explain",
+        "tree_string",
+        "is_local",
+        "is_streaming",
+        "input_files",
+        "spark_version",
+        "ddl_parse",
+    ] | None: ...
 
 global___AnalyzePlanResponse = AnalyzePlanResponse
 
diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py
index c95279a8c8e..d6ba9a36f96 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -447,7 +447,9 @@ class SparkSession:
 
     @property
     def version(self) -> str:
-        raise NotImplementedError("version() is not implemented.")
+        result = self._client._analyze(method="spark_version").spark_version
+        assert result is not None
+        return result
 
     # SparkConnect-specific API
     @property
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 84c3e4f23a6..70938459f79 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2751,6 +2751,12 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
                     self.spark.read.parquet(path).schema,
                 )
 
+    def test_version(self):
+        self.assertEqual(
+            self.connect.version,
+            self.spark.version,
+        )
+
     def test_unsupported_functions(self):
         # SPARK-41225: Disable unsupported functions.
         df = self.connect.read.table(self.tbl_name)
@@ -2799,7 +2805,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
             "sparkContext",
             "streams",
             "readStream",
-            "version",
         ):
             with self.assertRaises(NotImplementedError):
                 getattr(self.connect, f)()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org