You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/25 17:59:51 UTC
[spark] branch branch-3.4 updated: [SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 1c4bcf3a97e [SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs
1c4bcf3a97e is described below
commit 1c4bcf3a97e7773a3f23524e2545cd85dbd14ef1
Author: Zhen Li <zh...@users.noreply.github.com>
AuthorDate: Sat Feb 25 13:59:21 2023 -0400
[SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs
### What changes were proposed in this pull request?
Make binary compatibility check for SparkSession/Dataset/Column/functions etc.
### Why are the changes needed?
Help us to have a good understanding of the current API coverage of the Scala client.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #40168 from zhenlineo/comp-it.
Authored-by: Zhen Li <zh...@users.noreply.github.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit 2470b753171a3765e778ae699b4b1675ba7a023e)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../org/apache/spark/sql/DataFrameWriter.scala | 2 +-
.../org/apache/spark/sql/DataFrameWriterV2.scala | 2 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 60 ++++----
.../spark/sql/RelationalGroupedDataset.scala | 2 +-
.../sql/connect/client/CompatibilitySuite.scala | 156 +++++++++++++++------
5 files changed, 149 insertions(+), 73 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b7c4ed7bcab..8434addec92 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -252,7 +252,7 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
builder.putOptions(k, v)
}
- ds.session.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
+ ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
}
/**
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index ed149223129..b698e1dfaa1 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -157,7 +157,7 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T])
overwriteCondition.foreach(builder.setOverwriteCondition)
- ds.session.execute(proto.Command.newBuilder().setWriteOperationV2(builder).build())
+ ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperationV2(builder).build())
}
}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 87aadfe437b..83ed7bdc071 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -115,7 +115,7 @@ import org.apache.spark.util.Utils
*
* @since 3.4.0
*/
-class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: proto.Plan)
+class Dataset[T] private[sql] (val sparkSession: SparkSession, private[sql] val plan: proto.Plan)
extends Serializable {
// Make sure we don't forget to set plan id.
assert(plan.getRoot.getCommon.hasPlanId)
@@ -169,7 +169,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
@scala.annotation.varargs
- def toDF(colNames: String*): DataFrame = session.newDataset { builder =>
+ def toDF(colNames: String*): DataFrame = sparkSession.newDataset { builder =>
builder.getToDfBuilder
.setInput(plan.getRoot)
.addAllColumnNames(colNames.asJava)
@@ -191,7 +191,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @group basic
* @since 3.4.0
*/
- def to(schema: StructType): DataFrame = session.newDataset { builder =>
+ def to(schema: StructType): DataFrame = sparkSession.newDataset { builder =>
builder.getToSchemaBuilder
.setInput(plan.getRoot)
.setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
@@ -277,7 +277,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
private def explain(mode: proto.Explain.ExplainMode): Unit = {
// scalastyle:off println
- println(session.analyze(plan, mode).getExplainString)
+ println(sparkSession.analyze(plan, mode).getExplainString)
// scalastyle:on println
}
@@ -468,7 +468,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit = {
- val df = session.newDataset { builder =>
+ val df = sparkSession.newDataset { builder =>
builder.getShowStringBuilder
.setInput(plan.getRoot)
.setNumRows(numRows)
@@ -485,7 +485,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
}
private def buildJoin(right: Dataset[_])(f: proto.Join.Builder => Unit): DataFrame = {
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
val joinBuilder = builder.getJoinBuilder
joinBuilder.setLeft(plan.getRoot).setRight(right.plan.getRoot)
f(joinBuilder)
@@ -751,7 +751,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
}
private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
builder.getSortBuilder
.setInput(plan.getRoot)
.setIsGlobal(global)
@@ -859,7 +859,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
@scala.annotation.varargs
- def hint(name: String, parameters: Any*): Dataset[T] = session.newDataset { builder =>
+ def hint(name: String, parameters: Any*): Dataset[T] = sparkSession.newDataset { builder =>
builder.getHintBuilder
.setInput(plan.getRoot)
.setName(name)
@@ -899,7 +899,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @group typedrel
* @since 3.4.0
*/
- def as(alias: String): Dataset[T] = session.newDataset { builder =>
+ def as(alias: String): Dataset[T] = sparkSession.newDataset { builder =>
builder.getSubqueryAliasBuilder
.setInput(plan.getRoot)
.setAlias(alias)
@@ -939,7 +939,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
@scala.annotation.varargs
- def select(cols: Column*): DataFrame = session.newDataset { builder =>
+ def select(cols: Column*): DataFrame = sparkSession.newDataset { builder =>
builder.getProjectBuilder
.setInput(plan.getRoot)
.addAllExpressions(cols.map(_.expr).asJava)
@@ -989,7 +989,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @group typedrel
* @since 3.4.0
*/
- def filter(condition: Column): Dataset[T] = session.newDataset { builder =>
+ def filter(condition: Column): Dataset[T] = sparkSession.newDataset { builder =>
builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr)
}
@@ -1032,7 +1032,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
ids: Array[Column],
valuesOption: Option[Array[Column]],
variableColumnName: String,
- valueColumnName: String): DataFrame = session.newDataset { builder =>
+ valueColumnName: String): DataFrame = sparkSession.newDataset { builder =>
val unpivot = builder.getUnpivotBuilder
.setInput(plan.getRoot)
.addAllIds(ids.toSeq.map(_.expr).asJava)
@@ -1393,7 +1393,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @group typedrel
* @since 3.4.0
*/
- def limit(n: Int): Dataset[T] = session.newDataset { builder =>
+ def limit(n: Int): Dataset[T] = sparkSession.newDataset { builder =>
builder.getLimitBuilder
.setInput(plan.getRoot)
.setLimit(n)
@@ -1405,7 +1405,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @group typedrel
* @since 3.4.0
*/
- def offset(n: Int): Dataset[T] = session.newDataset { builder =>
+ def offset(n: Int): Dataset[T] = sparkSession.newDataset { builder =>
builder.getOffsetBuilder
.setInput(plan.getRoot)
.setOffset(n)
@@ -1413,7 +1413,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
private def buildSetOp(right: Dataset[T], setOpType: proto.SetOperation.SetOpType)(
f: proto.SetOperation.Builder => Unit): Dataset[T] = {
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
f(
builder.getSetOpBuilder
.setSetOpType(setOpType)
@@ -1677,7 +1677,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = {
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
builder.getSampleBuilder
.setInput(plan.getRoot)
.setWithReplacement(withReplacement)
@@ -1745,7 +1745,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
normalizedCumWeights
.sliding(2)
.map { case Array(low, high) =>
- session.newDataset[T] { builder =>
+ sparkSession.newDataset[T] { builder =>
builder.getSampleBuilder
.setInput(sortedInput)
.setWithReplacement(false)
@@ -1789,7 +1789,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
val aliases = values.zip(names).map { case (value, name) =>
value.name(name).expr.getAlias
}
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
builder.getWithColumnsBuilder
.setInput(plan.getRoot)
.addAllAliases(aliases.asJava)
@@ -1880,7 +1880,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
def withColumnsRenamed(colsMap: java.util.Map[String, String]): DataFrame = {
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
builder.getWithColumnsRenamedBuilder
.setInput(plan.getRoot)
.putAllRenameColumnsMap(colsMap)
@@ -1899,7 +1899,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
.setExpr(col(columnName).expr)
.addName(columnName)
.setMetadata(metadata.json)
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
builder.getWithColumnsBuilder
.setInput(plan.getRoot)
.addAliases(newAlias)
@@ -1959,7 +1959,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
@scala.annotation.varargs
def drop(col: Column, cols: Column*): DataFrame = buildDrop(col +: cols)
- private def buildDrop(cols: Seq[Column]): DataFrame = session.newDataset { builder =>
+ private def buildDrop(cols: Seq[Column]): DataFrame = sparkSession.newDataset { builder =>
builder.getDropBuilder
.setInput(plan.getRoot)
.addAllCols(cols.map(_.expr).asJava)
@@ -1972,7 +1972,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @group typedrel
* @since 3.4.0
*/
- def dropDuplicates(): Dataset[T] = session.newDataset { builder =>
+ def dropDuplicates(): Dataset[T] = sparkSession.newDataset { builder =>
builder.getDeduplicateBuilder
.setInput(plan.getRoot)
.setAllColumnsAsKeys(true)
@@ -1985,7 +1985,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @group typedrel
* @since 3.4.0
*/
- def dropDuplicates(colNames: Seq[String]): Dataset[T] = session.newDataset { builder =>
+ def dropDuplicates(colNames: Seq[String]): Dataset[T] = sparkSession.newDataset { builder =>
builder.getDeduplicateBuilder
.setInput(plan.getRoot)
.addAllColumnNames(colNames.asJava)
@@ -2042,7 +2042,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
@scala.annotation.varargs
- def describe(cols: String*): DataFrame = session.newDataset { builder =>
+ def describe(cols: String*): DataFrame = sparkSession.newDataset { builder =>
builder.getDescribeBuilder
.setInput(plan.getRoot)
.addAllCols(cols.asJava)
@@ -2117,7 +2117,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
@scala.annotation.varargs
- def summary(statistics: String*): DataFrame = session.newDataset { builder =>
+ def summary(statistics: String*): DataFrame = sparkSession.newDataset { builder =>
builder.getSummaryBuilder
.setInput(plan.getRoot)
.addAllStatistics(statistics.asJava)
@@ -2185,7 +2185,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
* @since 3.4.0
*/
def tail(n: Int): Array[T] = {
- val lastN = session.newDataset[T] { builder =>
+ val lastN = sparkSession.newDataset[T] { builder =>
builder.getTailBuilder
.setInput(plan.getRoot)
.setLimit(n)
@@ -2257,7 +2257,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
}
private def buildRepartition(numPartitions: Int, shuffle: Boolean): Dataset[T] = {
- session.newDataset { builder =>
+ sparkSession.newDataset { builder =>
builder.getRepartitionBuilder
.setInput(plan.getRoot)
.setNumPartitions(numPartitions)
@@ -2267,7 +2267,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
private def buildRepartitionByExpression(
numPartitions: Option[Int],
- partitionExprs: Seq[Column]): Dataset[T] = session.newDataset { builder =>
+ partitionExprs: Seq[Column]): Dataset[T] = sparkSession.newDataset { builder =>
val repartitionBuilder = builder.getRepartitionByExpressionBuilder
.setInput(plan.getRoot)
.addAllPartitionExprs(partitionExprs.map(_.expr).asJava)
@@ -2462,10 +2462,10 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
}
private[sql] def analyze: proto.AnalyzePlanResponse = {
- session.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
+ sparkSession.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
}
- def collectResult(): SparkResult = session.execute(plan)
+ def collectResult(): SparkResult = sparkSession.execute(plan)
private[sql] def withResult[E](f: SparkResult => E): E = {
val result = collectResult()
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index c918061ac46..76d3ab5cf09 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -42,7 +42,7 @@ class RelationalGroupedDataset protected[sql] (
pivot: Option[proto.Aggregate.Pivot] = None) {
private[this] def toDF(aggExprs: Seq[Column]): DataFrame = {
- df.session.newDataset { builder =>
+ df.sparkSession.newDataset { builder =>
builder.getAggregateBuilder
.setInput(df.plan.getRoot)
.addAllGroupingExpressions(groupingExprs.asJava)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 010f3c616e6..d6d21773732 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -69,30 +69,131 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite
val mima = new MiMaLib(Seq(clientJar, sqlJar))
val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty)
val includedRules = Seq(
- IncludeByName("org.apache.spark.sql.Column"),
- IncludeByName("org.apache.spark.sql.Column$"),
- IncludeByName("org.apache.spark.sql.Dataset"),
- // TODO(SPARK-42175) Add the Dataset object definition
- // IncludeByName("org.apache.spark.sql.Dataset$"),
- IncludeByName("org.apache.spark.sql.DataFrame"),
+ IncludeByName("org.apache.spark.sql.Column.*"),
+ IncludeByName("org.apache.spark.sql.DataFrame.*"),
IncludeByName("org.apache.spark.sql.DataFrameReader.*"),
IncludeByName("org.apache.spark.sql.DataFrameWriter.*"),
IncludeByName("org.apache.spark.sql.DataFrameWriterV2.*"),
- IncludeByName("org.apache.spark.sql.SparkSession"),
- IncludeByName("org.apache.spark.sql.SparkSession$")) ++ includeImplementedMethods(clientJar)
+ IncludeByName("org.apache.spark.sql.Dataset.*"),
+ IncludeByName("org.apache.spark.sql.functions.*"),
+ IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"),
+ IncludeByName("org.apache.spark.sql.SparkSession.*"))
val excludeRules = Seq(
// Filter unsupported rules:
- // Two sql overloading methods are marked experimental in the API and skipped in the client.
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"),
- // Deprecated json methods and RDD related methods are skipped in the client.
+ // Note when muting errors for a method, checks on all overloading methods are also muted.
+
+ // Skip all shaded dependencies and proto files in the client.
+ ProblemFilters.exclude[Problem]("org.sparkproject.*"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
+
+ // DataFrame Reader & Writer
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
- // Skip all shaded dependencies in the client.
- ProblemFilters.exclude[Problem]("org.sparkproject.*"),
- ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
- // Disable Range until we support typed APIs
+
+ // Dataset
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.ofRows"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_TAG"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.COL_POS_KEY"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_KEY"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.curId"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupBy"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.observe"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.as"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.checkpoint"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.localCheckpoint"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.withWatermark"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.stat"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.select"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.reduce"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupByKey"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.explode"), // deprecated
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.filter"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.map"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.mapPartitions"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.flatMap"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreach"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreachPartition"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.persist"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.cache"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.storageLevel"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.unpersist"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"),
+ ProblemFilters.exclude[Problem](
+ "org.apache.spark.sql.Dataset.registerTempTable"
+ ), // deprecated
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createTempView"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createOrReplaceTempView"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createGlobalTempView"),
+ ProblemFilters.exclude[Problem](
+ "org.apache.spark.sql.Dataset.createOrReplaceGlobalTempView"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJSON"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sameSemantics"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.semanticHash"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"),
+
+ // functions
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udf"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.call_udf"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.callUDF"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.unwrap_udt"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udaf"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.broadcast"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.count"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedlit"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedLit"),
+
+ // RelationalGroupedDataset
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.as"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.pivot"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"),
+
+ // SparkSession
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.active"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.implicits"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.version"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sharedState"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sessionState"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sqlContext"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.conf"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.listenerManager"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.streams"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.newSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataFrame"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataset"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataFrame"),
+ ProblemFilters.exclude[Problem](
+ "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataset"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.catalog"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.time"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.stop"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setActiveSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearActiveSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"))
val problems = allProblems
.filter { p =>
@@ -127,31 +228,6 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite
})
}
- /**
- * Find all methods that are implemented in the client jar. Once all major methods are
- * implemented we can switch to include all methods under the class using ".*" e.g.
- * "org.apache.spark.sql.Dataset.*"
- */
- private def includeImplementedMethods(clientJar: File): Seq[IncludeByName] = {
- val clsNames = Seq(
- "org.apache.spark.sql.Column",
- // TODO(SPARK-42175) Add all overloading methods. Temporarily mute compatibility check for \
- // the Dataset methods, as too many overload methods are missing.
- // "org.apache.spark.sql.Dataset",
- "org.apache.spark.sql.SparkSession")
-
- val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
- clsNames
- .flatMap { clsName =>
- val cls = clientClassLoader.loadClass(clsName)
- // all distinct method names
- cls.getMethods.map(m => s"$clsName.${m.getName}").toSet
- }
- .map { fullName =>
- IncludeByName(fullName)
- }
- }
-
private case class IncludeByName(name: String) extends ProblemFilter {
private[this] val pattern =
Pattern.compile(name.split("\\*", -1).map(Pattern.quote).mkString(".*"))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org