You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/06 19:17:31 UTC
[spark] branch branch-3.4 updated: [SPARK-42559][CONNECT] Implement DataFrameNaFunctions
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 7e4742c4d58 [SPARK-42559][CONNECT] Implement DataFrameNaFunctions
7e4742c4d58 is described below
commit 7e4742c4d5839cb2014958e8f0bc5156c7b86192
Author: panbingkun <pb...@gmail.com>
AuthorDate: Mon Mar 6 15:17:00 2023 -0400
[SPARK-42559][CONNECT] Implement DataFrameNaFunctions
### What changes were proposed in this pull request?
The pr aims to implement implement DataFrameNaFunctions.
### Why are the changes needed?
API coverage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add new UT.
Closes #40217 from panbingkun/SPARK-42559.
Authored-by: panbingkun <pb...@gmail.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit 9bf174f9722e34f13bfaede5e59f989bf2a511e9)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../apache/spark/sql/DataFrameNaFunctions.scala | 441 +++++++++++++++++++++
.../main/scala/org/apache/spark/sql/Dataset.scala | 12 +
.../spark/sql/DataFrameNaFunctionSuite.scala | 402 +++++++++++++++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 12 +
.../CheckConnectJvmClientCompatibility.scala | 5 +-
.../spark/sql/connect/client/util/QueryTest.scala | 209 ++++++++++
.../query-tests/explain-results/drop.explain | 2 +
.../query-tests/explain-results/fill.explain | 2 +
.../query-tests/explain-results/replace.explain | 2 +
.../test/resources/query-tests/queries/drop.json | 17 +
.../resources/query-tests/queries/drop.proto.bin | Bin 0 -> 58 bytes
.../test/resources/query-tests/queries/fill.json | 19 +
.../resources/query-tests/queries/fill.proto.bin | Bin 0 -> 57 bytes
.../resources/query-tests/queries/replace.json | 24 ++
.../query-tests/queries/replace.proto.bin | Bin 0 -> 77 bytes
15 files changed, 1146 insertions(+), 1 deletion(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
new file mode 100644
index 00000000000..17b95018f89
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.{NAReplace, Relation}
+import org.apache.spark.connect.proto.Expression.{Literal => GLiteral}
+import org.apache.spark.connect.proto.NAReplace.Replacement
+
+/**
+ * Functionality for working with missing data in `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameNaFunctions private[sql] (sparkSession: SparkSession, root: Relation) {
+
+ /**
+ * Returns a new `DataFrame` that drops rows containing any null or NaN values.
+ *
+ * @since 3.4.0
+ */
+ def drop(): DataFrame = buildDropDataFrame(None, None)
+
+ /**
+ * Returns a new `DataFrame` that drops rows containing null or NaN values.
+ *
+ * If `how` is "any", then drop rows containing any null or NaN values. If `how` is "all", then
+ * drop rows only if every column is null or NaN for that row.
+ *
+ * @since 3.4.0
+ */
+ def drop(how: String): DataFrame = {
+ buildDropDataFrame(None, buildMinNonNulls(how))
+ }
+
+ /**
+ * Returns a new `DataFrame` that drops rows containing any null or NaN values in the specified
+ * columns.
+ *
+ * @since 3.4.0
+ */
+ def drop(cols: Array[String]): DataFrame = drop(cols.toSeq)
+
+ /**
+ * (Scala-specific) Returns a new `DataFrame` that drops rows containing any null or NaN values
+ * in the specified columns.
+ *
+ * @since 3.4.0
+ */
+ def drop(cols: Seq[String]): DataFrame = buildDropDataFrame(Some(cols), None)
+
+ /**
+ * Returns a new `DataFrame` that drops rows containing null or NaN values in the specified
+ * columns.
+ *
+ * If `how` is "any", then drop rows containing any null or NaN values in the specified columns.
+ * If `how` is "all", then drop rows only if every specified column is null or NaN for that row.
+ *
+ * @since 3.4.0
+ */
+ def drop(how: String, cols: Array[String]): DataFrame = drop(how, cols.toSeq)
+
+ /**
+ * (Scala-specific) Returns a new `DataFrame` that drops rows containing null or NaN values in
+ * the specified columns.
+ *
+ * If `how` is "any", then drop rows containing any null or NaN values in the specified columns.
+ * If `how` is "all", then drop rows only if every specified column is null or NaN for that row.
+ *
+ * @since 3.4.0
+ */
+ def drop(how: String, cols: Seq[String]): DataFrame = {
+ buildDropDataFrame(Some(cols), buildMinNonNulls(how))
+ }
+
+ /**
+ * Returns a new `DataFrame` that drops rows containing less than `minNonNulls` non-null and
+ * non-NaN values.
+ *
+ * @since 3.4.0
+ */
+ def drop(minNonNulls: Int): DataFrame = {
+ buildDropDataFrame(None, Some(minNonNulls))
+ }
+
+ /**
+ * Returns a new `DataFrame` that drops rows containing less than `minNonNulls` non-null and
+ * non-NaN values in the specified columns.
+ *
+ * @since 3.4.0
+ */
+ def drop(minNonNulls: Int, cols: Array[String]): DataFrame = drop(minNonNulls, cols.toSeq)
+
+ /**
+ * (Scala-specific) Returns a new `DataFrame` that drops rows containing less than `minNonNulls`
+ * non-null and non-NaN values in the specified columns.
+ *
+ * @since 3.4.0
+ */
+ def drop(minNonNulls: Int, cols: Seq[String]): DataFrame = {
+ buildDropDataFrame(Some(cols), Some(minNonNulls))
+ }
+
+ private def buildMinNonNulls(how: String): Option[Int] = {
+ how.toLowerCase(Locale.ROOT) match {
+ case "any" => None // No-Op. Do nothing.
+ case "all" => Some(1)
+ case _ => throw new IllegalArgumentException(s"how ($how) must be 'any' or 'all'")
+ }
+ }
+
+ private def buildDropDataFrame(
+ cols: Option[Seq[String]],
+ minNonNulls: Option[Int]): DataFrame = {
+ sparkSession.newDataFrame { builder =>
+ val dropNaBuilder = builder.getDropNaBuilder.setInput(root)
+ cols.foreach(c => dropNaBuilder.addAllCols(c.asJava))
+ minNonNulls.foreach(dropNaBuilder.setMinNonNulls)
+ }
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null or NaN values in numeric columns with `value`.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Long): DataFrame = {
+ buildFillDataFrame(None, GLiteral.newBuilder().setLong(value).build())
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null or NaN values in specified numeric columns. If a
+ * specified column is not a numeric column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Long, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+ /**
+ * (Scala-specific) Returns a new `DataFrame` that replaces null or NaN values in specified
+ * numeric columns. If a specified column is not a numeric column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Long, cols: Seq[String]): DataFrame = {
+ buildFillDataFrame(Some(cols), GLiteral.newBuilder().setLong(value).build())
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null or NaN values in numeric columns with `value`.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Double): DataFrame = {
+ buildFillDataFrame(None, GLiteral.newBuilder().setDouble(value).build())
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null or NaN values in specified numeric columns. If a
+ * specified column is not a numeric column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Double, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+ /**
+ * (Scala-specific) Returns a new `DataFrame` that replaces null or NaN values in specified
+ * numeric columns. If a specified column is not a numeric column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Double, cols: Seq[String]): DataFrame = {
+ buildFillDataFrame(Some(cols), GLiteral.newBuilder().setDouble(value).build())
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null values in string columns with `value`.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: String): DataFrame = {
+ buildFillDataFrame(None, GLiteral.newBuilder().setString(value).build())
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null values in specified string columns. If a
+ * specified column is not a string column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: String, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+ /**
+ * (Scala-specific) Returns a new `DataFrame` that replaces null values in specified string
+ * columns. If a specified column is not a string column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: String, cols: Seq[String]): DataFrame = {
+ buildFillDataFrame(Some(cols), GLiteral.newBuilder().setString(value).build())
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null values in boolean columns with `value`.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Boolean): DataFrame = {
+ buildFillDataFrame(None, GLiteral.newBuilder().setBoolean(value).build())
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null values in specified boolean columns. If a
+ * specified column is not a boolean column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Boolean, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+ /**
+ * (Scala-specific) Returns a new `DataFrame` that replaces null values in specified boolean
+ * columns. If a specified column is not a boolean column, it is ignored.
+ *
+ * @since 3.4.0
+ */
+ def fill(value: Boolean, cols: Seq[String]): DataFrame = {
+ buildFillDataFrame(Some(cols), GLiteral.newBuilder().setBoolean(value).build())
+ }
+
+ private def buildFillDataFrame(cols: Option[Seq[String]], value: GLiteral): DataFrame = {
+ sparkSession.newDataFrame { builder =>
+ val fillNaBuilder = builder.getFillNaBuilder.setInput(root)
+ fillNaBuilder.addValues(value)
+ cols.foreach(c => fillNaBuilder.addAllCols(c.asJava))
+ }
+ }
+
+ /**
+ * Returns a new `DataFrame` that replaces null values.
+ *
+ * The key of the map is the column name, and the value of the map is the replacement value. The
+ * value must be of the following type: `Integer`, `Long`, `Float`, `Double`, `String`,
+ * `Boolean`. Replacement values are cast to the column data type.
+ *
+ * For example, the following replaces null values in column "A" with string "unknown", and null
+ * values in column "B" with numeric value 1.0.
+ * {{{
+ * import com.google.common.collect.ImmutableMap;
+ * df.na.fill(ImmutableMap.of("A", "unknown", "B", 1.0));
+ * }}}
+ *
+ * @since 3.4.0
+ */
+ def fill(valueMap: java.util.Map[String, Any]): DataFrame = fillMap(valueMap.asScala.toSeq)
+
+ /**
+ * Returns a new `DataFrame` that replaces null values.
+ *
+ * The key of the map is the column name, and the value of the map is the replacement value. The
+ * value must be of the following type: `Integer`, `Long`, `Float`, `Double`, `String`,
+ * `Boolean`. Replacement values are cast to the column data type.
+ *
+ * For example, the following replaces null values in column "A" with string "unknown", and null
+ * values in column "B" with numeric value 1.0.
+ * {{{
+ * import com.google.common.collect.ImmutableMap;
+ * df.na.fill(ImmutableMap.of("A", "unknown", "B", 1.0));
+ * }}}
+ *
+ * @since 3.4.0
+ */
+ def fill(valueMap: Map[String, Any]): DataFrame = fillMap(valueMap.toSeq)
+
+ private def fillMap(values: Seq[(String, Any)]): DataFrame = {
+ sparkSession.newDataFrame { builder =>
+ val fillNaBuilder = builder.getFillNaBuilder.setInput(root)
+ values.map { case (colName, replaceValue) =>
+ fillNaBuilder.addCols(colName).addValues(functions.lit(replaceValue).expr.getLiteral)
+ }
+ }
+ }
+
+ /**
+ * Replaces values matching keys in `replacement` map with the corresponding values.
+ *
+ * {{{
+ * import com.google.common.collect.ImmutableMap;
+ *
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height".
+ * df.na.replace("height", ImmutableMap.of(1.0, 2.0));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "name".
+ * df.na.replace("name", ImmutableMap.of("UNKNOWN", "unnamed"));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in all string columns.
+ * df.na.replace("*", ImmutableMap.of("UNKNOWN", "unnamed"));
+ * }}}
+ *
+ * @param col
+ * name of the column to apply the value replacement. If `col` is "*", replacement is applied
+ * on all string, numeric or boolean columns.
+ * @param replacement
+ * value replacement map. Key and value of `replacement` map must have the same type, and can
+ * only be doubles, strings or booleans. The map value can have nulls.
+ * @since 3.4.0
+ */
+ def replace[T](col: String, replacement: java.util.Map[T, T]): DataFrame =
+ replace(col, replacement.asScala.toMap)
+
+ /**
+ * (Scala-specific) Replaces values matching keys in `replacement` map.
+ *
+ * {{{
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height".
+ * df.na.replace("height", Map(1.0 -> 2.0));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "name".
+ * df.na.replace("name", Map("UNKNOWN" -> "unnamed"));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in all string columns.
+ * df.na.replace("*", Map("UNKNOWN" -> "unnamed"));
+ * }}}
+ *
+ * @param col
+ * name of the column to apply the value replacement. If `col` is "*", replacement is applied
+ * on all string, numeric or boolean columns.
+ * @param replacement
+ * value replacement map. Key and value of `replacement` map must have the same type, and can
+ * only be doubles, strings or booleans. The map value can have nulls.
+ * @since 3.4.0
+ */
+ def replace[T](col: String, replacement: Map[T, T]): DataFrame = {
+ val cols = if (col != "*") Some(Seq(col)) else None
+ buildReplaceDataFrame(cols, buildReplacement(replacement))
+ }
+
+ /**
+ * Replaces values matching keys in `replacement` map with the corresponding values.
+ *
+ * {{{
+ * import com.google.common.collect.ImmutableMap;
+ *
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height" and "weight".
+ * df.na.replace(new String[] {"height", "weight"}, ImmutableMap.of(1.0, 2.0));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "firstname" and "lastname".
+ * df.na.replace(new String[] {"firstname", "lastname"}, ImmutableMap.of("UNKNOWN", "unnamed"));
+ * }}}
+ *
+ * @param cols
+ * list of columns to apply the value replacement. If `col` is "*", replacement is applied on
+ * all string, numeric or boolean columns.
+ * @param replacement
+ * value replacement map. Key and value of `replacement` map must have the same type, and can
+ * only be doubles, strings or booleans. The map value can have nulls.
+ * @since 3.4.0
+ */
+ def replace[T](cols: Array[String], replacement: java.util.Map[T, T]): DataFrame = {
+ replace(cols.toSeq, replacement.asScala.toMap)
+ }
+
+ /**
+ * (Scala-specific) Replaces values matching keys in `replacement` map.
+ *
+ * {{{
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height" and "weight".
+ * df.na.replace("height" :: "weight" :: Nil, Map(1.0 -> 2.0));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "firstname" and "lastname".
+ * df.na.replace("firstname" :: "lastname" :: Nil, Map("UNKNOWN" -> "unnamed"));
+ * }}}
+ *
+ * @param cols
+ * list of columns to apply the value replacement. If `col` is "*", replacement is applied on
+ * all string, numeric or boolean columns.
+ * @param replacement
+ * value replacement map. Key and value of `replacement` map must have the same type, and can
+ * only be doubles, strings or booleans. The map value can have nulls.
+ * @since 3.4.0
+ */
+ def replace[T](cols: Seq[String], replacement: Map[T, T]): DataFrame = {
+ buildReplaceDataFrame(Some(cols), buildReplacement(replacement))
+ }
+
+ private def buildReplaceDataFrame(
+ cols: Option[Seq[String]],
+ replacements: Iterable[NAReplace.Replacement]): DataFrame = {
+ sparkSession.newDataFrame { builder =>
+ val replaceBuilder = builder.getReplaceBuilder.setInput(root)
+ replaceBuilder.addAllReplacements(replacements.asJava)
+ cols.foreach(c => replaceBuilder.addAllCols(c.asJava))
+ }
+ }
+
+ private def buildReplacement[T](replacement: Map[T, T]): Iterable[NAReplace.Replacement] = {
+ // Convert the NumericType in replacement map to DoubleType,
+ // while leaving StringType, BooleanType and null untouched.
+ val replacementMap: Map[_, _] = replacement.map {
+ case (k, v: String) => (k, v)
+ case (k, v: Boolean) => (k, v)
+ case (k: String, null) => (k, null)
+ case (k: Boolean, null) => (k, null)
+ case (k, null) => (convertToDouble(k), null)
+ case (k, v) => (convertToDouble(k), convertToDouble(v))
+ }
+ replacementMap.map { case (oldValue, newValue) =>
+ Replacement
+ .newBuilder()
+ .setOldValue(functions.lit(oldValue).expr.getLiteral)
+ .setNewValue(functions.lit(newValue).expr.getLiteral)
+ .build()
+ }
+ }
+
+ private def convertToDouble(v: Any): Double = v match {
+ case v: Float => v.toDouble
+ case v: Double => v
+ case v: Long => v.toDouble
+ case v: Int => v.toDouble
+ case v =>
+ throw new IllegalArgumentException(s"Unsupported value type ${v.getClass.getName} ($v).")
+ }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 74dd58f4903..13dff3a874f 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -534,6 +534,18 @@ class Dataset[T] private[sql] (
}
}
+ /**
+ * Returns a [[DataFrameNaFunctions]] for working with missing data.
+ * {{{
+ * // Dropping rows containing any null values.
+ * ds.na.drop()
+ * }}}
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def na: DataFrameNaFunctions = new DataFrameNaFunctions(sparkSession, plan.getRoot)
+
/**
* Returns a [[DataFrameStatFunctions]] for working statistic functions support.
* {{{
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala
new file mode 100644
index 00000000000..5049147678b
--- /dev/null
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.connect.client.util.QueryTest
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class DataFrameNaFunctionSuite extends QueryTest {
+ private def createDF(): DataFrame = {
+ val sparkSession = spark
+ import sparkSession.implicits._
+ Seq[(String, java.lang.Integer, java.lang.Double)](
+ ("Bob", 16, 176.5),
+ ("Alice", null, 164.3),
+ ("David", 60, null),
+ ("Nina", 25, Double.NaN),
+ ("Amy", null, null),
+ (null, null, null)).toDF("name", "age", "height")
+ }
+
+ def createNaNDF(): DataFrame = {
+ val sparkSession = spark
+ import sparkSession.implicits._
+ Seq[(
+ java.lang.Integer,
+ java.lang.Long,
+ java.lang.Short,
+ java.lang.Byte,
+ java.lang.Float,
+ java.lang.Double)](
+ (1, 1L, 1.toShort, 1.toByte, 1.0f, 1.0),
+ (0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN)).toDF(
+ "int",
+ "long",
+ "short",
+ "byte",
+ "float",
+ "double")
+ }
+
+ def createDFWithNestedColumns: DataFrame = {
+ val schema = new StructType()
+ .add(
+ "c1",
+ new StructType()
+ .add("c1-1", StringType)
+ .add("c1-2", StringType))
+ val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null))
+ spark.createDataFrame(data.asJava, schema)
+ }
+
+ test("drop") {
+ val input = createDF()
+ val rows = input.collect()
+
+ val result1 = input.na.drop("name" :: Nil).select("name")
+ val expected1 = Array(Row("Bob"), Row("Alice"), Row("David"), Row("Nina"), Row("Amy"))
+ checkAnswer(result1, expected1)
+
+ val result2 = input.na.drop("age" :: Nil).select("name")
+ val expected2 = Array(Row("Bob"), Row("David"), Row("Nina"))
+ checkAnswer(result2, expected2)
+
+ val result3 = input.na.drop("age" :: "height" :: Nil)
+ val expected3 = Array(rows(0))
+ checkAnswer(result3, expected3)
+
+ val result4 = input.na.drop()
+ checkAnswer(result4, expected3)
+
+ // dropna on an a dataframe with no column should return an empty data frame.
+ val empty = input.filter("age > 100")
+ assert(empty.na.drop().count() === 0L)
+
+ // Make sure the columns are properly named.
+ assert(input.na.drop().columns.toSeq === input.columns.toSeq)
+ }
+
+ test("drop with how") {
+ val input = createDF()
+ val rows = input.collect()
+
+ checkAnswer(
+ input.na.drop("all").select("name"),
+ Row("Bob") :: Row("Alice") :: Row("David") :: Row("Nina") :: Row("Amy") :: Nil)
+
+ checkAnswer(input.na.drop("any"), rows(0) :: Nil)
+
+ checkAnswer(input.na.drop("any", Seq("age", "height")), rows(0) :: Nil)
+
+ checkAnswer(
+ input.na.drop("all", Seq("age", "height")).select("name"),
+ Row("Bob") :: Row("Alice") :: Row("David") :: Row("Nina") :: Nil)
+ }
+
+ test("drop with threshold") {
+ val input = createDF()
+ val rows = input.collect()
+
+ checkAnswer(input.na.drop(2, Seq("age", "height")), rows(0) :: Nil)
+
+ checkAnswer(input.na.drop(3, Seq("name", "age", "height")), rows(0))
+
+ // Make sure the columns are properly named.
+ assert(input.na.drop(2, Seq("age", "height")).columns.toSeq === input.columns.toSeq)
+ }
+
+ test("fill") {
+ val sparkSession = spark
+ import sparkSession.implicits._
+
+ val input = createDF()
+
+ val boolInput = Seq[(String, java.lang.Boolean)](
+ ("Bob", false),
+ ("Alice", null),
+ ("Mallory", true),
+ (null, null)).toDF("name", "spy")
+
+ val fillNumeric = input.na.fill(50.6)
+ checkAnswer(
+ fillNumeric,
+ Row("Bob", 16, 176.5) ::
+ Row("Alice", 50, 164.3) ::
+ Row("David", 60, 50.6) ::
+ Row("Nina", 25, 50.6) ::
+ Row("Amy", 50, 50.6) ::
+ Row(null, 50, 50.6) :: Nil)
+
+ // Make sure the columns are properly named.
+ assert(fillNumeric.columns.toSeq === input.columns.toSeq)
+
+ // string
+ checkAnswer(
+ input.na.fill("unknown").select("name"),
+ Row("Bob") :: Row("Alice") :: Row("David") ::
+ Row("Nina") :: Row("Amy") :: Row("unknown") :: Nil)
+ assert(input.na.fill("unknown").columns.toSeq === input.columns.toSeq)
+
+ // boolean
+ checkAnswer(
+ boolInput.na.fill(true).select("spy"),
+ Row(false) :: Row(true) :: Row(true) :: Row(true) :: Nil)
+ assert(boolInput.na.fill(true).columns.toSeq === boolInput.columns.toSeq)
+
+ // fill double with subset columns
+ checkAnswer(
+ input.na.fill(50.6, "age" :: Nil).select("name", "age"),
+ Row("Bob", 16) ::
+ Row("Alice", 50) ::
+ Row("David", 60) ::
+ Row("Nina", 25) ::
+ Row("Amy", 50) ::
+ Row(null, 50) :: Nil)
+
+ // fill boolean with subset columns
+ checkAnswer(
+ boolInput.na.fill(true, "spy" :: Nil).select("name", "spy"),
+ Row("Bob", false) ::
+ Row("Alice", true) ::
+ Row("Mallory", true) ::
+ Row(null, true) :: Nil)
+
+ // fill string with subset columns
+ checkAnswer(
+ Seq[(String, String)]((null, null)).toDF("col1", "col2").na.fill("test", "col1" :: Nil),
+ Row("test", null))
+
+ checkAnswer(
+ Seq[(Long, Long)]((1, 2), (-1, -2), (9123146099426677101L, 9123146560113991650L))
+ .toDF("a", "b")
+ .na
+ .fill(0),
+ Row(1, 2) :: Row(-1, -2) :: Row(9123146099426677101L, 9123146560113991650L) :: Nil)
+
+ checkAnswer(
+ Seq[(java.lang.Long, java.lang.Double)](
+ (null, 3.14),
+ (9123146099426677101L, null),
+ (9123146560113991650L, 1.6),
+ (null, null)).toDF("a", "b").na.fill(0.2),
+ Row(0, 3.14) :: Row(9123146099426677101L, 0.2) :: Row(9123146560113991650L, 1.6)
+ :: Row(0, 0.2) :: Nil)
+
+ checkAnswer(
+ Seq[(java.lang.Long, java.lang.Float)](
+ (null, 3.14f),
+ (9123146099426677101L, null),
+ (9123146560113991650L, 1.6f),
+ (null, null)).toDF("a", "b").na.fill(0.2),
+ Row(0, 3.14f) :: Row(9123146099426677101L, 0.2f) :: Row(9123146560113991650L, 1.6f)
+ :: Row(0, 0.2f) :: Nil)
+
+ checkAnswer(
+ Seq[(java.lang.Long, java.lang.Double)]((null, 1.23), (3L, null), (4L, 3.45))
+ .toDF("a", "b")
+ .na
+ .fill(2.34),
+ Row(2, 1.23) :: Row(3, 2.34) :: Row(4, 3.45) :: Nil)
+
+ checkAnswer(
+ Seq[(java.lang.Long, java.lang.Double)]((null, 1.23), (3L, null), (4L, 3.45))
+ .toDF("a", "b")
+ .na
+ .fill(5),
+ Row(5, 1.23) :: Row(3, 5.0) :: Row(4, 3.45) :: Nil)
+ }
+
+ test("fill with map") {
+ val sparkSession = spark
+ import sparkSession.implicits._
+
+ val df = Seq[(
+ String,
+ String,
+ java.lang.Integer,
+ java.lang.Long,
+ java.lang.Float,
+ java.lang.Double,
+ java.lang.Boolean)]((null, null, null, null, null, null, null))
+ .toDF(
+ "stringFieldA",
+ "stringFieldB",
+ "integerField",
+ "longField",
+ "floatField",
+ "doubleField",
+ "booleanField")
+
+ val fillMap = Map(
+ "stringFieldA" -> "test",
+ "integerField" -> 1,
+ "longField" -> 2L,
+ "floatField" -> 3.3f,
+ "doubleField" -> 4.4d,
+ "booleanField" -> false)
+
+ val expectedRow = Row("test", null, 1, 2L, 3.3f, 4.4d, false)
+ checkAnswer(df.na.fill(fillMap), expectedRow)
+ checkAnswer(df.na.fill(fillMap.asJava), expectedRow) // Test Java version
+
+ // Ensure replacement values are cast to the column data type.
+ checkAnswer(
+ df.na.fill(
+ Map("integerField" -> 1d, "longField" -> 2d, "floatField" -> 3d, "doubleField" -> 4d)),
+ Row(null, null, 1, 2L, 3f, 4d, null))
+
+ // Ensure column types do not change. Columns that have null values replaced
+ // will no longer be flagged as nullable, so do not compare schemas directly.
+ assert(
+ df.na.fill(fillMap).schema.fields.map(_.dataType) ===
+ df.schema.fields.map(_.dataType))
+ }
+
+ test("fill with col(*)") {
+ val df = createDF()
+ // If columns are specified with "*", they are ignored.
+ checkAnswer(df.na.fill("new name", Seq("*")), df.collect())
+ }
+
+ test("drop with col(*)") {
+ val df = createDF()
+ val ex = intercept[RuntimeException] {
+ df.na.drop("any", Seq("*")).collect()
+ }
+ assert(ex.getMessage.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"))
+ }
+
+ test("fill with nested columns") {
+ val df = createDFWithNestedColumns
+ checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), df)
+ }
+
+ test("drop with nested columns") {
+ val df = createDFWithNestedColumns
+
+ // Rows with the specified nested columns whose null values are dropped.
+ assert(df.count == 3)
+ checkAnswer(df.na.drop("any", Seq("c1.c1-1")), Seq(Row(Row("b1", "b2"))))
+ }
+
+ test("replace") {
+ val input = createDF()
+
+ val result1 = input.na
+ .replace(
+ Seq("age", "height"),
+ Map(
+ 16 -> 61,
+ 60 -> 6,
+ 164.3 -> 461.3 // Alice is really tall
+ ))
+ .collect()
+ assert(result1(0) === Row("Bob", 61, 176.5))
+ assert(result1(1) === Row("Alice", null, 461.3))
+ assert(result1(2) === Row("David", 6, null))
+ assert(result1(3).get(2).asInstanceOf[Double].isNaN)
+ assert(result1(4) === Row("Amy", null, null))
+ assert(result1(5) === Row(null, null, null))
+
+ // Replace only the age column
+ val result2 = input.na
+ .replace(
+ "age",
+ Map(
+ 16 -> 61,
+ 60 -> 6,
+ 164.3 -> 461.3 // Alice is really tall
+ ))
+ .collect()
+ assert(result2(0) === Row("Bob", 61, 176.5))
+ assert(result2(1) === Row("Alice", null, 164.3))
+ assert(result2(2) === Row("David", 6, null))
+ assert(result2(3).get(2).asInstanceOf[Double].isNaN)
+ assert(result2(4) === Row("Amy", null, null))
+ assert(result2(5) === Row(null, null, null))
+ }
+
+ test("replace with null") {
+ val input = spark.sql(
+ "select name, height, married from (values " +
+ "('Bob', 176.5, true), " +
+ "('Alice', 164.3, false), " +
+ "('David', null, true))" +
+ "as t(name, height, married)")
+
+ // Replace String with String and null
+ val result1 = input.na.replace("name", Map("Bob" -> "Bravo", "Alice" -> null))
+
+ checkAnswer(
+ result1,
+ Row("Bravo", 176.5, true) ::
+ Row(null, 164.3, false) ::
+ Row("David", null, true) :: Nil)
+
+ // Replace Double with null
+ val result2 = input.na.replace("height", Map[Any, Any](164.3 -> null))
+ checkAnswer(
+ result2,
+ Row("Bob", 176.5, true) ::
+ Row("Alice", null, false) ::
+ Row("David", null, true) :: Nil)
+
+ // Replace Boolean with null
+ checkAnswer(
+ input.na.replace("*", Map[Any, Any](false -> null)),
+ Row("Bob", 176.5, true) ::
+ Row("Alice", 164.3, null) ::
+ Row("David", null, true) :: Nil)
+
+ // Replace String with null and then drop rows containing null
+ checkAnswer(
+ input.na.replace("name", Map("Bob" -> null)).na.drop("name" :: Nil).select("name"),
+ Row("Alice") :: Row("David") :: Nil)
+ }
+
+ test("replace nan with float") {
+ checkAnswer(
+ createNaNDF().na.replace("*", Map(Float.NaN -> 10.0f)),
+ Row(1, 1L, 1.toShort, 1.toByte, 1.0f, 1.0) ::
+ Row(0, 0L, 0.toShort, 0.toByte, 10.0f, 10.0) :: Nil)
+ }
+
+ test("replace nan with double") {
+ checkAnswer(
+ createNaNDF().na.replace("*", Map(Double.NaN -> 10.0)),
+ Row(1, 1L, 1.toShort, 1.toByte, 1.0f, 1.0) ::
+ Row(0, 0L, 0.toShort, 0.toByte, 10.0f, 10.0) :: Nil)
+ }
+
+ test("replace float with nan") {
+ checkAnswer(
+ createNaNDF().na.replace("*", Map(1.0f -> Float.NaN)),
+ Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) ::
+ Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) :: Nil)
+ }
+
+ test("replace double with nan") {
+ checkAnswer(
+ createNaNDF().na.replace("*", Map(1.0 -> Double.NaN)),
+ Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) ::
+ Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) :: Nil)
+ }
+
+}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index b59c783aec9..f5ffaf9b73a 100755
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2058,4 +2058,16 @@ class PlanGenerationTestSuite
test("sampleBy") {
simple.stat.sampleBy("id", Map(0 -> 0.1, 1 -> 0.2), 0L)
}
+
+ test("drop") {
+ simple.na.drop(5, Seq("id", "a"))
+ }
+
+ test("fill") {
+ simple.na.fill(8L, Seq("id"))
+ }
+
+ test("replace") {
+ simple.na.replace[Long]("id", Map(1L -> 8L))
+ }
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index b684721243a..868e7ae7b74 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -109,6 +109,7 @@ object CheckConnectJvmClientCompatibility {
IncludeByName("org.apache.spark.sql.ColumnName.*"),
IncludeByName("org.apache.spark.sql.DataFrame.*"),
IncludeByName("org.apache.spark.sql.DataFrameReader.*"),
+ IncludeByName("org.apache.spark.sql.DataFrameNaFunctions.*"),
IncludeByName("org.apache.spark.sql.DataFrameStatFunctions.*"),
IncludeByName("org.apache.spark.sql.DataFrameWriter.*"),
IncludeByName("org.apache.spark.sql.DataFrameWriterV2.*"),
@@ -134,6 +135,9 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
+ // DataFrameNaFunctions
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"),
+
// DataFrameStatFunctions
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameStatFunctions.bloomFilter"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameStatFunctions.this"),
@@ -148,7 +152,6 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.select"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/QueryTest.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/QueryTest.scala
new file mode 100644
index 00000000000..1c3f49f897f
--- /dev/null
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/QueryTest.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.util
+
+import java.util.TimeZone
+
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.util.sideBySide
+
+abstract class QueryTest extends RemoteSparkSession {
+
+ /**
+ * Runs the plan and makes sure the answer matches the expected result.
+ *
+ * @param df
+ * the [[DataFrame]] to be executed
+ * @param expectedAnswer
+ * the expected result in a [[Seq]] of [[Row]]s.
+ */
+ protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
+ QueryTest.checkAnswer(df, expectedAnswer)
+ }
+
+ protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = {
+ checkAnswer(df, Seq(expectedAnswer))
+ }
+
+ protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = {
+ checkAnswer(df, expectedAnswer.collect())
+ }
+}
+
+object QueryTest extends Assertions {
+
+ /**
+ * Runs the plan and makes sure the answer matches the expected result.
+ *
+ * @param df
+ * the DataFrame to be executed
+ * @param expectedAnswer
+ * the expected result in a Seq of Rows.
+ */
+ def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row], isSorted: Boolean = false): Unit = {
+ getErrorMessageInCheckAnswer(df, expectedAnswer, isSorted) match {
+ case Some(errorMessage) => fail(errorMessage)
+ case None =>
+ }
+ }
+
+ /**
+ * Runs the plan and makes sure the answer matches the expected result. If there was exception
+ * during the execution or the contents of the DataFrame does not match the expected result, an
+ * error message will be returned. Otherwise, a None will be returned.
+ *
+ * @param df
+ * the DataFrame to be executed
+ * @param expectedAnswer
+ * the expected result in a Seq of Rows.
+ */
+ def getErrorMessageInCheckAnswer(
+ df: DataFrame,
+ expectedAnswer: Seq[Row],
+ isSorted: Boolean = false): Option[String] = {
+ val sparkAnswer =
+ try df.collect().toSeq
+ catch {
+ case e: Exception =>
+ val errorMessage =
+ s"""
+ |Exception thrown while executing query:
+ |${df.analyze}
+ |== Exception ==
+ |$e
+ |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
+ """.stripMargin
+ return Some(errorMessage)
+ }
+
+ sameRows(expectedAnswer, sparkAnswer, isSorted).map { results =>
+ s"""
+ |Results do not match for query:
+ |Timezone: ${TimeZone.getDefault}
+ |Timezone Env: ${sys.env.getOrElse("TZ", "")}
+ |
+ |${df.analyze}
+ |== Results ==
+ |$results
+ """.stripMargin
+ }
+ }
+
+ def prepareAnswer(answer: Seq[Row], isSorted: Boolean): Seq[Row] = {
+ // Converts data to types that we can do equality comparison using Scala collections.
+ // For BigDecimal type, the Scala type has a better definition of equality test (similar to
+ // Java's java.math.BigDecimal.compareTo).
+ // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
+ // equality test.
+ val converted: Seq[Row] = answer.map(prepareRow)
+ if (!isSorted) converted.sortBy(_.toString()) else converted
+ }
+
+ // We need to call prepareRow recursively to handle schemas with struct types.
+ def prepareRow(row: Row): Row = {
+ Row.fromSeq(row.toSeq.map {
+ case null => null
+ case bd: java.math.BigDecimal => BigDecimal(bd)
+ // Equality of WrappedArray differs for AnyVal and AnyRef in Scala 2.12.2+
+ case seq: Seq[_] =>
+ seq.map {
+ case b: java.lang.Byte => b.byteValue
+ case s: java.lang.Short => s.shortValue
+ case i: java.lang.Integer => i.intValue
+ case l: java.lang.Long => l.longValue
+ case f: java.lang.Float => f.floatValue
+ case d: java.lang.Double => d.doubleValue
+ case x => x
+ }
+ // Convert array to Seq for easy equality check.
+ case b: Array[_] => b.toSeq
+ case r: Row => prepareRow(r)
+ case o => o
+ })
+ }
+
+ private def genError(
+ expectedAnswer: Seq[Row],
+ sparkAnswer: Seq[Row],
+ isSorted: Boolean = false): String = {
+ val getRowType: Option[Row] => String = row =>
+ row
+ .map(row =>
+ if (row.schema == null) {
+ "struct<>"
+ } else {
+ s"${row.schema.catalogString}"
+ })
+ .getOrElse("struct<>")
+
+ s"""
+ |== Results ==
+ |${sideBySide(
+ s"== Correct Answer - ${expectedAnswer.size} ==" +:
+ getRowType(expectedAnswer.headOption) +:
+ prepareAnswer(expectedAnswer, isSorted).map(_.toString()),
+ s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ getRowType(sparkAnswer.headOption) +:
+ prepareAnswer(sparkAnswer, isSorted).map(_.toString())).mkString("\n")}
+ """.stripMargin
+ }
+
+ def includesRows(expectedRows: Seq[Row], sparkAnswer: Seq[Row]): Option[String] = {
+ if (!prepareAnswer(expectedRows, true).toSet.subsetOf(
+ prepareAnswer(sparkAnswer, true).toSet)) {
+ return Some(genError(expectedRows, sparkAnswer, true))
+ }
+ None
+ }
+
+ def compare(obj1: Any, obj2: Any): Boolean = (obj1, obj2) match {
+ case (null, null) => true
+ case (null, _) => false
+ case (_, null) => false
+ case (a: Array[_], b: Array[_]) =>
+ a.length == b.length && a.zip(b).forall { case (l, r) => compare(l, r) }
+ case (a: Map[_, _], b: Map[_, _]) =>
+ a.size == b.size && a.keys.forall { aKey =>
+ b.keys.find(bKey => compare(aKey, bKey)).exists(bKey => compare(a(aKey), b(bKey)))
+ }
+ case (a: Iterable[_], b: Iterable[_]) =>
+ a.size == b.size && a.zip(b).forall { case (l, r) => compare(l, r) }
+ case (a: Product, b: Product) =>
+ compare(a.productIterator.toSeq, b.productIterator.toSeq)
+ case (a: Row, b: Row) =>
+ compare(a.toSeq, b.toSeq)
+ // 0.0 == -0.0, turn float/double to bits before comparison, to distinguish 0.0 and -0.0.
+ case (a: Double, b: Double) =>
+ java.lang.Double.doubleToRawLongBits(a) == java.lang.Double.doubleToRawLongBits(b)
+ case (a: Float, b: Float) =>
+ java.lang.Float.floatToRawIntBits(a) == java.lang.Float.floatToRawIntBits(b)
+ case (a, b) => a == b
+ }
+
+ def sameRows(
+ expectedAnswer: Seq[Row],
+ sparkAnswer: Seq[Row],
+ isSorted: Boolean = false): Option[String] = {
+ if (!compare(prepareAnswer(expectedAnswer, isSorted), prepareAnswer(sparkAnswer, isSorted))) {
+ return Some(genError(expectedAnswer, sparkAnswer, isSorted))
+ }
+ None
+ }
+}
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop.explain
new file mode 100644
index 00000000000..85a15dfab8d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop.explain
@@ -0,0 +1,2 @@
+Filter atleastnnonnulls(5, id#0L, a#0)
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/fill.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/fill.explain
new file mode 100644
index 00000000000..12d9bff0e8a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/fill.explain
@@ -0,0 +1,2 @@
+Project [coalesce(id#0L, cast(8 as bigint)) AS id#0L, a#0, b#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/replace.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/replace.explain
new file mode 100644
index 00000000000..ef3de21e881
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/replace.explain
@@ -0,0 +1,2 @@
+Project [CASE WHEN (cast(id#0L as double) = 1.0) THEN cast(8.0 as bigint) ELSE id#0L END AS id#0L, a#0, b#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop.json b/connector/connect/common/src/test/resources/query-tests/queries/drop.json
new file mode 100644
index 00000000000..a5176b25b05
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop.json
@@ -0,0 +1,17 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "dropNa": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": ["id", "a"],
+ "minNonNulls": 5
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop.proto.bin
new file mode 100644
index 00000000000..9e18d02afbc
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/drop.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/fill.json b/connector/connect/common/src/test/resources/query-tests/queries/fill.json
new file mode 100644
index 00000000000..8308af1f579
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/fill.json
@@ -0,0 +1,19 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "fillNa": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": ["id"],
+ "values": [{
+ "long": "8"
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/fill.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/fill.proto.bin
new file mode 100644
index 00000000000..b034c5e64a8
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/fill.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/replace.json b/connector/connect/common/src/test/resources/query-tests/queries/replace.json
new file mode 100644
index 00000000000..d0e39d340c0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/replace.json
@@ -0,0 +1,24 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "replace": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": ["id"],
+ "replacements": [{
+ "oldValue": {
+ "double": 1.0
+ },
+ "newValue": {
+ "double": 8.0
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/replace.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/replace.proto.bin
new file mode 100644
index 00000000000..d1868cee7bf
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/replace.proto.bin differ
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org