You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/21 11:23:31 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

zhengruifeng opened a new pull request, #38742:
URL: https://github.com/apache/spark/pull/38742

   ### What changes were proposed in this pull request?
   1, Make AnalyzePlan support multiple analysis tasks
   2, implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles
   
   
   ### Why are the changes needed?
   for API coverage
   
   
   ### Does this PR introduce _any_ user-facing change?
   yes, new APIs
   
   
   ### How was this patch tested?
   added UTs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1032619631


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -822,6 +821,83 @@ def schema(self) -> StructType:
         else:
             return self._schema
 
+    @property
+    def isLocal(self) -> bool:
+        """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
+        (without any Spark executors).
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        bool
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session._analyze(query).is_local

Review Comment:
   We literally can cache everything for each DataFrame since it is immutable. But I guess we need a design/discussion to clarify details of how.



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -822,6 +821,83 @@ def schema(self) -> StructType:
         else:
             return self._schema
 
+    @property
+    def isLocal(self) -> bool:
+        """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
+        (without any Spark executors).
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        bool
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session._analyze(query).is_local

Review Comment:
   We literally can cache everything for each DataFrame since it is immutable. But I guess we need a design/discussion to clarify details of how and when.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #38742: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`

Posted by GitBox <gi...@apache.org>.
zhengruifeng closed pull request #38742: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`
URL: https://github.com/apache/spark/pull/38742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38742: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38742:
URL: https://github.com/apache/spark/pull/38742#issuecomment-1326998077

   Merged into master, thank you all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029953075


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.
+    optional int32 level = 1;
+  }
+
+  // Returns a best-effort snapshot of the files that compose this Dataset.
+  // It will invoke 'Dataset.inputFiles' to compute the results.
+  message InputFiles { }
+
+  // Returns a `hashCode` of the logical query plan.
+  // It will invoke 'Dataset.semanticHash' to compute the results.
+  message SemanticHash { }
+
+  // Returns `true` when the logical query plans inside both Datasets are equal.
+  // It will invoke 'Dataset.sameSemantics' to compute the results.
+  message SameSemantics {
+    Relation other = 1;
+  }
 }
 
 // Response to performing analysis of the query. Contains relevant metadata to be able to
 // reason about the performance.
 message AnalyzePlanResponse {
   string client_id = 1;
-  DataType schema = 2;
 
-  // The extended explain string as produced by Spark.
-  string explain_string = 3;
+  repeated AnalysisResult results = 2;
+
+  message AnalysisResult {
+    oneof result {
+      // The analyzed schema.
+      DataType schema = 1;
+
+      // Is local
+      bool is_local = 2;
+
+      // Is Streaming
+      bool is_streaming = 3;
+
+      // The extended explain string as produced by Spark.
+      string explain_string = 4;
+
+      // Get the tree string of the schema.
+      string tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash code.
+      int32 semantic_hash = 7;

Review Comment:
   the methods added here are all public API, and used by the users



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1028639472


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,135 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {

Review Comment:
   ~~I think we can also put catalog methods like `listTables`/`getTable` in `AnalysisTask `~~
   
   catalog apis don't require a `plan`, maybe better to have a separate rpc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029502971


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;

Review Comment:
   What does this actually mean here? What is the use case for multiple analysis tasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1030992346


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.

Review Comment:
   Document what is the default value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1028915567


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -736,6 +736,19 @@ def toPandas(self) -> Optional["pandas.DataFrame"]:
         query = self._plan.to_proto(self._session)
         return self._session._to_pandas(query)
 
+    def _basic_analyze(self) -> None:
+        # update isLocal, isStreaming, explain_string, tree_string, semantic_hash
+        if self._schema is None:
+            if self._plan is not None:
+                query = self._plan.to_proto(self._session)
+                if self._session is None:
+                    raise Exception("Cannot analyze without RemoteSparkSession.")
+                results = self._session.basic_analyze(query)
+                for k, v in results.items():
+                    self._cache[k] = v

Review Comment:
   Let's don't do caching stuff for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029504790


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.
+    optional int32 level = 1;
+  }
+
+  // Returns a best-effort snapshot of the files that compose this Dataset.
+  // It will invoke 'Dataset.inputFiles' to compute the results.
+  message InputFiles { }
+
+  // Returns a `hashCode` of the logical query plan.
+  // It will invoke 'Dataset.semanticHash' to compute the results.
+  message SemanticHash { }
+
+  // Returns `true` when the logical query plans inside both Datasets are equal.
+  // It will invoke 'Dataset.sameSemantics' to compute the results.
+  message SameSemantics {
+    Relation other = 1;
+  }
 }
 
 // Response to performing analysis of the query. Contains relevant metadata to be able to
 // reason about the performance.
 message AnalyzePlanResponse {
   string client_id = 1;
-  DataType schema = 2;
 
-  // The extended explain string as produced by Spark.
-  string explain_string = 3;
+  repeated AnalysisResult results = 2;
+
+  message AnalysisResult {
+    oneof result {
+      // The analyzed schema.
+      DataType schema = 1;
+
+      // Is local
+      bool is_local = 2;
+
+      // Is Streaming
+      bool is_streaming = 3;
+
+      // The extended explain string as produced by Spark.
+      string explain_string = 4;
+
+      // Get the tree string of the schema.
+      string tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash code.
+      int32 semantic_hash = 7;

Review Comment:
   there is no symmetry to the request so it should't be in the request. What is the value of this for the customer? Is this part of the Spark public API?
   
   Do we need this for Spark Connect now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1028639472


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,135 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {

Review Comment:
   I think we can also put catalog methods like `listTables`/`getTable` in `AnalysisTask `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029497376


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;

Review Comment:
   given that the message below is empty, what does this actually mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029952200


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;

Review Comment:
   the message for `Explain` was not changed,  just moved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38742:
URL: https://github.com/apache/spark/pull/38742#issuecomment-1322842091

   cc @HyukjinKwon @cloud-fan @amaliujia @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029756854


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,135 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {

Review Comment:
   Yes that is why I actually find model each of the method as RPC because that is more closer to the nature of RPC.



##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,135 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {

Review Comment:
   Yes that is why I actually wanted to model each of the Catalog method as RPC because that is more closer to the nature of RPC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1032016700


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -822,6 +821,83 @@ def schema(self) -> StructType:
         else:
             return self._schema
 
+    @property
+    def isLocal(self) -> bool:
+        """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
+        (without any Spark executors).
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        bool
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session._analyze(query).is_local

Review Comment:
   are we going to cache the analyze result later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1031003410


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029448341


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;

Review Comment:
   Honestly this is a client side thing. They already have the schema, so they can construct it themselves. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029975064


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;

Review Comment:
   we also ask the server to provide the string for `df.show` and `df.explain`, maybe simpler to also do this for `printSchema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029493565


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.
+    optional int32 level = 1;
+  }
+
+  // Returns a best-effort snapshot of the files that compose this Dataset.
+  // It will invoke 'Dataset.inputFiles' to compute the results.
+  message InputFiles { }

Review Comment:
   is this really useful here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1032620049


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -822,6 +821,83 @@ def schema(self) -> StructType:
         else:
             return self._schema
 
+    @property
+    def isLocal(self) -> bool:
+        """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
+        (without any Spark executors).
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        bool
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session._analyze(query).is_local

Review Comment:
   There is another interesting question is if we want to do caching on the server side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029496454


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;

Review Comment:
   given the complexity of the type this should have a more explicit documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1031022749


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.
+    optional int32 level = 1;
+  }
+
+  // Returns a best-effort snapshot of the files that compose this Dataset.
+  // It will invoke 'Dataset.inputFiles' to compute the results.
+  message InputFiles { }

Review Comment:
   it had some usages anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1031023071


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.

Review Comment:
   will remove semantic_hash and same_semantics since they are developer apis, although they were also in pyspark



##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;

Review Comment:
   will remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029446996


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.

Review Comment:
   Are they equal or do the produce the same result? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1031021916


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -797,6 +796,137 @@ def schema(self) -> StructType:
         else:
             return self._schema
 
+    @property
+    def isLocal(self) -> bool:
+        """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
+        (without any Spark executors).
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        bool
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session.is_local(query)
+
+    @property
+    def isStreaming(self) -> bool:
+        """Returns ``True`` if this :class:`DataFrame` contains one or more sources that
+        continuously return data as it arrives. A :class:`DataFrame` that reads data from a
+        streaming source must be executed as a :class:`StreamingQuery` using the :func:`start`
+        method in :class:`DataStreamWriter`.  Methods that return a single answer, (e.g.,
+        :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there
+        is a streaming source present.
+
+        .. versionadded:: 3.4.0
+
+        Notes
+        -----
+        This API is evolving.
+
+        Returns
+        -------
+        bool
+            Whether it's streaming DataFrame or not.
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session.is_streaming(query)
+
+    def printSchema(self) -> None:
+        """Prints out the schema in the tree format.
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        None
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        print(self._session.tree_string(query))
+
+    def semanticHash(self) -> int:

Review Comment:
   oh, I did not notice that, I am fine to remove `sameSemantics` and `semanticHash`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1028935043


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -736,6 +736,19 @@ def toPandas(self) -> Optional["pandas.DataFrame"]:
         query = self._plan.to_proto(self._session)
         return self._session._to_pandas(query)
 
+    def _basic_analyze(self) -> None:
+        # update isLocal, isStreaming, explain_string, tree_string, semantic_hash
+        if self._schema is None:
+            if self._plan is not None:
+                query = self._plan.to_proto(self._session)
+                if self._session is None:
+                    raise Exception("Cannot analyze without RemoteSparkSession.")
+                results = self._session.basic_analyze(query)
+                for k, v in results.items():
+                    self._cache[k] = v

Review Comment:
   got it, will update 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029498101


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {

Review Comment:
   I think this is overcomplicating the message type. For elements that are just plain empty we should not push them to the client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029953393


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.
+    optional int32 level = 1;
+  }
+
+  // Returns a best-effort snapshot of the files that compose this Dataset.
+  // It will invoke 'Dataset.inputFiles' to compute the results.
+  message InputFiles { }
+
+  // Returns a `hashCode` of the logical query plan.
+  // It will invoke 'Dataset.semanticHash' to compute the results.
+  message SemanticHash { }
+
+  // Returns `true` when the logical query plans inside both Datasets are equal.
+  // It will invoke 'Dataset.sameSemantics' to compute the results.
+  message SameSemantics {
+    Relation other = 1;
+  }
 }
 
 // Response to performing analysis of the query. Contains relevant metadata to be able to
 // reason about the performance.
 message AnalyzePlanResponse {
   string client_id = 1;
-  DataType schema = 2;
 
-  // The extended explain string as produced by Spark.
-  string explain_string = 3;
+  repeated AnalysisResult results = 2;
+
+  message AnalysisResult {
+    oneof result {
+      // The analyzed schema.
+      DataType schema = 1;
+
+      // Is local
+      bool is_local = 2;
+
+      // Is Streaming
+      bool is_streaming = 3;
+
+      // The extended explain string as produced by Spark.
+      string explain_string = 4;
+
+      // Get the tree string of the schema.
+      string tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash code.
+      int32 semantic_hash = 7;

Review Comment:
   printSchema is frequently used, but I also add others by the way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029949868


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.

Review Comment:
   one e2e test was added for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029503512


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {

Review Comment:
   Why would the request contain so much detail?



##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {
+
+    // (Optional) The level to generate the string.
+    optional int32 level = 1;
+  }
+
+  // Returns a best-effort snapshot of the files that compose this Dataset.
+  // It will invoke 'Dataset.inputFiles' to compute the results.
+  message InputFiles { }
+
+  // Returns a `hashCode` of the logical query plan.
+  // It will invoke 'Dataset.semanticHash' to compute the results.
+  message SemanticHash { }
+
+  // Returns `true` when the logical query plans inside both Datasets are equal.
+  // It will invoke 'Dataset.sameSemantics' to compute the results.
+  message SameSemantics {
+    Relation other = 1;
+  }
 }
 
 // Response to performing analysis of the query. Contains relevant metadata to be able to
 // reason about the performance.
 message AnalyzePlanResponse {
   string client_id = 1;
-  DataType schema = 2;
 
-  // The extended explain string as produced by Spark.
-  string explain_string = 3;
+  repeated AnalysisResult results = 2;
+
+  message AnalysisResult {

Review Comment:
   doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1032103353


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -822,6 +821,83 @@ def schema(self) -> StructType:
         else:
             return self._schema
 
+    @property
+    def isLocal(self) -> bool:
+        """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
+        (without any Spark executors).
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        bool
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session._analyze(query).is_local

Review Comment:
   https://github.com/apache/spark/pull/38742#discussion_r1028915567
   
   I think we will do the caching in near future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1027903577


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,135 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {

Review Comment:
   I think we will put catalog methods like `getTable`/`listTables` in it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029446204


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;

Review Comment:
   Do we really want to expose this in connect? The problem is hash stability. The same client can connect to different spark versions and get different hashes for this same plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029492909


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;
+
+      // Get the tree string of the schema.
+      TreeString tree_string = 5;
+
+      // Get the input files.
+      InputFiles input_files = 6;
+
+      // Get the semantic hash
+      SemanticHash semantic_hash = 7;
+
+      // Check whether plans are equal.
+      SameSemantics same_semantics = 8;
+    }
+  }
+
+  // Analyze the input plan and return the schema.
+  message Schema { }
+
+  // Returns true if the `collect` and `take` methods can be run locally.
+  message IsLocal { }
+
+  // Returns true if this Dataset contains one or more sources that continuously
+  // return data as it arrives.
+  message IsStreaming { }
+
+  // Explains the input plan based on a configurable mode.
+  message Explain {
+    // Plan explanation mode.
+    enum ExplainMode {
+      MODE_UNSPECIFIED = 0;
+
+      // Generates only physical plan.
+      SIMPLE = 1;
+
+      // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan.
+      // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans
+      // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects.
+      // The optimized logical plan transforms through a set of optimization rules, resulting in the
+      // physical plan.
+      EXTENDED = 2;
+
+      // Generates code for the statement, if any and a physical plan.
+      CODEGEN = 3;
+
+      // If plan node statistics are available, generates a logical plan and also the statistics.
+      COST = 4;
+
+      // Generates a physical plan outline and also node details.
+      FORMATTED = 5;
+    }
+
+    // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings.
+    ExplainMode explain_mode= 1;
+  }
+
+  // Generate a string to express the schema in a nice tree format.
+  // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema')
+  // to compute the results.
+  message TreeString {

Review Comment:
   for just having one optional int that is a weird message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1029500646


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;
+
+  message AnalysisTask {
+    oneof task {
+      // Get the schema
+      Schema schema = 1;
+
+      // Is local
+      IsLocal is_local = 2;
+
+      // Is Streaming
+      IsStreaming is_streaming = 3;
+
+      // Get the explain string of the plan.
+      Explain explain = 4;

Review Comment:
   again, why an extra message type just to encapsulate an enum



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1030992308


##########
connector/connect/src/main/protobuf/spark/connect/base.proto:
##########
@@ -100,18 +70,138 @@ message AnalyzePlanRequest {
   // logging purposes and will not be interpreted by the server.
   optional string client_type = 4;
 
-  // (Optional) Get the explain string of the plan.
-  Explain explain = 5;
+  repeated AnalysisTask tasks = 5;

Review Comment:
   multiple analysis tasks is for this case:  user can get all attributes in single RPC and then cache them for reusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/semanticHash/sameSemantics/inputFiles

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38742:
URL: https://github.com/apache/spark/pull/38742#discussion_r1031014545


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -797,6 +796,137 @@ def schema(self) -> StructType:
         else:
             return self._schema
 
+    @property
+    def isLocal(self) -> bool:
+        """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
+        (without any Spark executors).
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        bool
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session.is_local(query)
+
+    @property
+    def isStreaming(self) -> bool:
+        """Returns ``True`` if this :class:`DataFrame` contains one or more sources that
+        continuously return data as it arrives. A :class:`DataFrame` that reads data from a
+        streaming source must be executed as a :class:`StreamingQuery` using the :func:`start`
+        method in :class:`DataStreamWriter`.  Methods that return a single answer, (e.g.,
+        :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there
+        is a streaming source present.
+
+        .. versionadded:: 3.4.0
+
+        Notes
+        -----
+        This API is evolving.
+
+        Returns
+        -------
+        bool
+            Whether it's streaming DataFrame or not.
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        return self._session.is_streaming(query)
+
+    def printSchema(self) -> None:
+        """Prints out the schema in the tree format.
+
+        .. versionadded:: 3.4.0
+
+        Returns
+        -------
+        None
+        """
+        if self._plan is None:
+            raise Exception("Cannot analyze on empty plan.")
+        query = self._plan.to_proto(self._session)
+        print(self._session.tree_string(query))
+
+    def semanticHash(self) -> int:

Review Comment:
   This is a developer API in Dataset, do we really need to provide it in Spark connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org