You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/25 17:59:51 UTC

[spark] branch branch-3.4 updated: [SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 1c4bcf3a97e [SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs
1c4bcf3a97e is described below

commit 1c4bcf3a97e7773a3f23524e2545cd85dbd14ef1
Author: Zhen Li <zh...@users.noreply.github.com>
AuthorDate: Sat Feb 25 13:59:21 2023 -0400

    [SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs
    
    ### What changes were proposed in this pull request?
    Make binary compatibility check for SparkSession/Dataset/Column/functions etc.
    
    ### Why are the changes needed?
    Help us to have a good understanding of the current API coverage of the Scala client.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #40168 from zhenlineo/comp-it.
    
    Authored-by: Zhen Li <zh...@users.noreply.github.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit 2470b753171a3765e778ae699b4b1675ba7a023e)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../org/apache/spark/sql/DataFrameWriter.scala     |   2 +-
 .../org/apache/spark/sql/DataFrameWriterV2.scala   |   2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  60 ++++----
 .../spark/sql/RelationalGroupedDataset.scala       |   2 +-
 .../sql/connect/client/CompatibilitySuite.scala    | 156 +++++++++++++++------
 5 files changed, 149 insertions(+), 73 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b7c4ed7bcab..8434addec92 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -252,7 +252,7 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
       builder.putOptions(k, v)
     }
 
-    ds.session.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
+    ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
   }
 
   /**
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index ed149223129..b698e1dfaa1 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -157,7 +157,7 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T])
 
     overwriteCondition.foreach(builder.setOverwriteCondition)
 
-    ds.session.execute(proto.Command.newBuilder().setWriteOperationV2(builder).build())
+    ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperationV2(builder).build())
   }
 }
 
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 87aadfe437b..83ed7bdc071 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -115,7 +115,7 @@ import org.apache.spark.util.Utils
  *
  * @since 3.4.0
  */
-class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: proto.Plan)
+class Dataset[T] private[sql] (val sparkSession: SparkSession, private[sql] val plan: proto.Plan)
     extends Serializable {
   // Make sure we don't forget to set plan id.
   assert(plan.getRoot.getCommon.hasPlanId)
@@ -169,7 +169,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   @scala.annotation.varargs
-  def toDF(colNames: String*): DataFrame = session.newDataset { builder =>
+  def toDF(colNames: String*): DataFrame = sparkSession.newDataset { builder =>
     builder.getToDfBuilder
       .setInput(plan.getRoot)
       .addAllColumnNames(colNames.asJava)
@@ -191,7 +191,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @group basic
    * @since 3.4.0
    */
-  def to(schema: StructType): DataFrame = session.newDataset { builder =>
+  def to(schema: StructType): DataFrame = sparkSession.newDataset { builder =>
     builder.getToSchemaBuilder
       .setInput(plan.getRoot)
       .setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
@@ -277,7 +277,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
 
   private def explain(mode: proto.Explain.ExplainMode): Unit = {
     // scalastyle:off println
-    println(session.analyze(plan, mode).getExplainString)
+    println(sparkSession.analyze(plan, mode).getExplainString)
     // scalastyle:on println
   }
 
@@ -468,7 +468,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   def show(numRows: Int, truncate: Int, vertical: Boolean): Unit = {
-    val df = session.newDataset { builder =>
+    val df = sparkSession.newDataset { builder =>
       builder.getShowStringBuilder
         .setInput(plan.getRoot)
         .setNumRows(numRows)
@@ -485,7 +485,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
   }
 
   private def buildJoin(right: Dataset[_])(f: proto.Join.Builder => Unit): DataFrame = {
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       val joinBuilder = builder.getJoinBuilder
       joinBuilder.setLeft(plan.getRoot).setRight(right.plan.getRoot)
       f(joinBuilder)
@@ -751,7 +751,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
   }
 
   private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       builder.getSortBuilder
         .setInput(plan.getRoot)
         .setIsGlobal(global)
@@ -859,7 +859,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   @scala.annotation.varargs
-  def hint(name: String, parameters: Any*): Dataset[T] = session.newDataset { builder =>
+  def hint(name: String, parameters: Any*): Dataset[T] = sparkSession.newDataset { builder =>
     builder.getHintBuilder
       .setInput(plan.getRoot)
       .setName(name)
@@ -899,7 +899,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @group typedrel
    * @since 3.4.0
    */
-  def as(alias: String): Dataset[T] = session.newDataset { builder =>
+  def as(alias: String): Dataset[T] = sparkSession.newDataset { builder =>
     builder.getSubqueryAliasBuilder
       .setInput(plan.getRoot)
       .setAlias(alias)
@@ -939,7 +939,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   @scala.annotation.varargs
-  def select(cols: Column*): DataFrame = session.newDataset { builder =>
+  def select(cols: Column*): DataFrame = sparkSession.newDataset { builder =>
     builder.getProjectBuilder
       .setInput(plan.getRoot)
       .addAllExpressions(cols.map(_.expr).asJava)
@@ -989,7 +989,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @group typedrel
    * @since 3.4.0
    */
-  def filter(condition: Column): Dataset[T] = session.newDataset { builder =>
+  def filter(condition: Column): Dataset[T] = sparkSession.newDataset { builder =>
     builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr)
   }
 
@@ -1032,7 +1032,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
       ids: Array[Column],
       valuesOption: Option[Array[Column]],
       variableColumnName: String,
-      valueColumnName: String): DataFrame = session.newDataset { builder =>
+      valueColumnName: String): DataFrame = sparkSession.newDataset { builder =>
     val unpivot = builder.getUnpivotBuilder
       .setInput(plan.getRoot)
       .addAllIds(ids.toSeq.map(_.expr).asJava)
@@ -1393,7 +1393,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @group typedrel
    * @since 3.4.0
    */
-  def limit(n: Int): Dataset[T] = session.newDataset { builder =>
+  def limit(n: Int): Dataset[T] = sparkSession.newDataset { builder =>
     builder.getLimitBuilder
       .setInput(plan.getRoot)
       .setLimit(n)
@@ -1405,7 +1405,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @group typedrel
    * @since 3.4.0
    */
-  def offset(n: Int): Dataset[T] = session.newDataset { builder =>
+  def offset(n: Int): Dataset[T] = sparkSession.newDataset { builder =>
     builder.getOffsetBuilder
       .setInput(plan.getRoot)
       .setOffset(n)
@@ -1413,7 +1413,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
 
   private def buildSetOp(right: Dataset[T], setOpType: proto.SetOperation.SetOpType)(
       f: proto.SetOperation.Builder => Unit): Dataset[T] = {
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       f(
         builder.getSetOpBuilder
           .setSetOpType(setOpType)
@@ -1677,7 +1677,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = {
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       builder.getSampleBuilder
         .setInput(plan.getRoot)
         .setWithReplacement(withReplacement)
@@ -1745,7 +1745,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
     normalizedCumWeights
       .sliding(2)
       .map { case Array(low, high) =>
-        session.newDataset[T] { builder =>
+        sparkSession.newDataset[T] { builder =>
           builder.getSampleBuilder
             .setInput(sortedInput)
             .setWithReplacement(false)
@@ -1789,7 +1789,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
     val aliases = values.zip(names).map { case (value, name) =>
       value.name(name).expr.getAlias
     }
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       builder.getWithColumnsBuilder
         .setInput(plan.getRoot)
         .addAllAliases(aliases.asJava)
@@ -1880,7 +1880,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   def withColumnsRenamed(colsMap: java.util.Map[String, String]): DataFrame = {
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       builder.getWithColumnsRenamedBuilder
         .setInput(plan.getRoot)
         .putAllRenameColumnsMap(colsMap)
@@ -1899,7 +1899,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
       .setExpr(col(columnName).expr)
       .addName(columnName)
       .setMetadata(metadata.json)
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       builder.getWithColumnsBuilder
         .setInput(plan.getRoot)
         .addAliases(newAlias)
@@ -1959,7 +1959,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
   @scala.annotation.varargs
   def drop(col: Column, cols: Column*): DataFrame = buildDrop(col +: cols)
 
-  private def buildDrop(cols: Seq[Column]): DataFrame = session.newDataset { builder =>
+  private def buildDrop(cols: Seq[Column]): DataFrame = sparkSession.newDataset { builder =>
     builder.getDropBuilder
       .setInput(plan.getRoot)
       .addAllCols(cols.map(_.expr).asJava)
@@ -1972,7 +1972,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @group typedrel
    * @since 3.4.0
    */
-  def dropDuplicates(): Dataset[T] = session.newDataset { builder =>
+  def dropDuplicates(): Dataset[T] = sparkSession.newDataset { builder =>
     builder.getDeduplicateBuilder
       .setInput(plan.getRoot)
       .setAllColumnsAsKeys(true)
@@ -1985,7 +1985,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @group typedrel
    * @since 3.4.0
    */
-  def dropDuplicates(colNames: Seq[String]): Dataset[T] = session.newDataset { builder =>
+  def dropDuplicates(colNames: Seq[String]): Dataset[T] = sparkSession.newDataset { builder =>
     builder.getDeduplicateBuilder
       .setInput(plan.getRoot)
       .addAllColumnNames(colNames.asJava)
@@ -2042,7 +2042,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   @scala.annotation.varargs
-  def describe(cols: String*): DataFrame = session.newDataset { builder =>
+  def describe(cols: String*): DataFrame = sparkSession.newDataset { builder =>
     builder.getDescribeBuilder
       .setInput(plan.getRoot)
       .addAllCols(cols.asJava)
@@ -2117,7 +2117,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   @scala.annotation.varargs
-  def summary(statistics: String*): DataFrame = session.newDataset { builder =>
+  def summary(statistics: String*): DataFrame = sparkSession.newDataset { builder =>
     builder.getSummaryBuilder
       .setInput(plan.getRoot)
       .addAllStatistics(statistics.asJava)
@@ -2185,7 +2185,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
    * @since 3.4.0
    */
   def tail(n: Int): Array[T] = {
-    val lastN = session.newDataset[T] { builder =>
+    val lastN = sparkSession.newDataset[T] { builder =>
       builder.getTailBuilder
         .setInput(plan.getRoot)
         .setLimit(n)
@@ -2257,7 +2257,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
   }
 
   private def buildRepartition(numPartitions: Int, shuffle: Boolean): Dataset[T] = {
-    session.newDataset { builder =>
+    sparkSession.newDataset { builder =>
       builder.getRepartitionBuilder
         .setInput(plan.getRoot)
         .setNumPartitions(numPartitions)
@@ -2267,7 +2267,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
 
   private def buildRepartitionByExpression(
       numPartitions: Option[Int],
-      partitionExprs: Seq[Column]): Dataset[T] = session.newDataset { builder =>
+      partitionExprs: Seq[Column]): Dataset[T] = sparkSession.newDataset { builder =>
     val repartitionBuilder = builder.getRepartitionByExpressionBuilder
       .setInput(plan.getRoot)
       .addAllPartitionExprs(partitionExprs.map(_.expr).asJava)
@@ -2462,10 +2462,10 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
   }
 
   private[sql] def analyze: proto.AnalyzePlanResponse = {
-    session.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
+    sparkSession.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
   }
 
-  def collectResult(): SparkResult = session.execute(plan)
+  def collectResult(): SparkResult = sparkSession.execute(plan)
 
   private[sql] def withResult[E](f: SparkResult => E): E = {
     val result = collectResult()
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index c918061ac46..76d3ab5cf09 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -42,7 +42,7 @@ class RelationalGroupedDataset protected[sql] (
     pivot: Option[proto.Aggregate.Pivot] = None) {
 
   private[this] def toDF(aggExprs: Seq[Column]): DataFrame = {
-    df.session.newDataset { builder =>
+    df.sparkSession.newDataset { builder =>
       builder.getAggregateBuilder
         .setInput(df.plan.getRoot)
         .addAllGroupingExpressions(groupingExprs.asJava)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 010f3c616e6..d6d21773732 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -69,30 +69,131 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite
     val mima = new MiMaLib(Seq(clientJar, sqlJar))
     val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty)
     val includedRules = Seq(
-      IncludeByName("org.apache.spark.sql.Column"),
-      IncludeByName("org.apache.spark.sql.Column$"),
-      IncludeByName("org.apache.spark.sql.Dataset"),
-      // TODO(SPARK-42175) Add the Dataset object definition
-      // IncludeByName("org.apache.spark.sql.Dataset$"),
-      IncludeByName("org.apache.spark.sql.DataFrame"),
+      IncludeByName("org.apache.spark.sql.Column.*"),
+      IncludeByName("org.apache.spark.sql.DataFrame.*"),
       IncludeByName("org.apache.spark.sql.DataFrameReader.*"),
       IncludeByName("org.apache.spark.sql.DataFrameWriter.*"),
       IncludeByName("org.apache.spark.sql.DataFrameWriterV2.*"),
-      IncludeByName("org.apache.spark.sql.SparkSession"),
-      IncludeByName("org.apache.spark.sql.SparkSession$")) ++ includeImplementedMethods(clientJar)
+      IncludeByName("org.apache.spark.sql.Dataset.*"),
+      IncludeByName("org.apache.spark.sql.functions.*"),
+      IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"),
+      IncludeByName("org.apache.spark.sql.SparkSession.*"))
     val excludeRules = Seq(
       // Filter unsupported rules:
-      // Two sql overloading methods are marked experimental in the API and skipped in the client.
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"),
-      // Deprecated json methods and RDD related methods are skipped in the client.
+      // Note when muting errors for a method, checks on all overloading methods are also muted.
+
+      // Skip all shaded dependencies and proto files in the client.
+      ProblemFilters.exclude[Problem]("org.sparkproject.*"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
+
+      // DataFrame Reader & Writer
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
-      // Skip all shaded dependencies in the client.
-      ProblemFilters.exclude[Problem]("org.sparkproject.*"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
-      // Disable Range until we support typed APIs
+
+      // Dataset
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.ofRows"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_TAG"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.COL_POS_KEY"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_KEY"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.curId"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupBy"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.observe"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.as"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.checkpoint"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.localCheckpoint"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.withWatermark"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.stat"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.select"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.reduce"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupByKey"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.explode"), // deprecated
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.filter"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.map"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.mapPartitions"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.flatMap"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreach"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreachPartition"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.persist"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.cache"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.storageLevel"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.unpersist"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"),
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.Dataset.registerTempTable"
+      ), // deprecated
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createTempView"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createOrReplaceTempView"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createGlobalTempView"),
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.Dataset.createOrReplaceGlobalTempView"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJSON"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sameSemantics"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.semanticHash"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"),
+
+      // functions
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udf"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.call_udf"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.callUDF"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.unwrap_udt"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udaf"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.broadcast"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.count"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedlit"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedLit"),
+
+      // RelationalGroupedDataset
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.as"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.pivot"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"),
+
+      // SparkSession
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.active"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.implicits"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.version"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sharedState"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sessionState"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sqlContext"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.conf"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.listenerManager"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.streams"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.newSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataFrame"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataset"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataFrame"),
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataset"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.catalog"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.time"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.stop"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setActiveSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearActiveSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"))
     val problems = allProblems
       .filter { p =>
@@ -127,31 +228,6 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite
       })
   }
 
-  /**
-   * Find all methods that are implemented in the client jar. Once all major methods are
-   * implemented we can switch to include all methods under the class using ".*" e.g.
-   * "org.apache.spark.sql.Dataset.*"
-   */
-  private def includeImplementedMethods(clientJar: File): Seq[IncludeByName] = {
-    val clsNames = Seq(
-      "org.apache.spark.sql.Column",
-      // TODO(SPARK-42175) Add all overloading methods. Temporarily mute compatibility check for \
-      //  the Dataset methods, as too many overload methods are missing.
-      // "org.apache.spark.sql.Dataset",
-      "org.apache.spark.sql.SparkSession")
-
-    val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
-    clsNames
-      .flatMap { clsName =>
-        val cls = clientClassLoader.loadClass(clsName)
-        // all distinct method names
-        cls.getMethods.map(m => s"$clsName.${m.getName}").toSet
-      }
-      .map { fullName =>
-        IncludeByName(fullName)
-      }
-  }
-
   private case class IncludeByName(name: String) extends ProblemFilter {
     private[this] val pattern =
       Pattern.compile(name.split("\\*", -1).map(Pattern.quote).mkString(".*"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org