You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/06 19:17:31 UTC

[spark] branch branch-3.4 updated: [SPARK-42559][CONNECT] Implement DataFrameNaFunctions

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 7e4742c4d58 [SPARK-42559][CONNECT] Implement DataFrameNaFunctions
7e4742c4d58 is described below

commit 7e4742c4d5839cb2014958e8f0bc5156c7b86192
Author: panbingkun <pb...@gmail.com>
AuthorDate: Mon Mar 6 15:17:00 2023 -0400

    [SPARK-42559][CONNECT] Implement DataFrameNaFunctions
    
    ### What changes were proposed in this pull request?
    The pr aims to implement implement DataFrameNaFunctions.
    
    ### Why are the changes needed?
    API coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add new UT.
    
    Closes #40217 from panbingkun/SPARK-42559.
    
    Authored-by: panbingkun <pb...@gmail.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit 9bf174f9722e34f13bfaede5e59f989bf2a511e9)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../apache/spark/sql/DataFrameNaFunctions.scala    | 441 +++++++++++++++++++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  12 +
 .../spark/sql/DataFrameNaFunctionSuite.scala       | 402 +++++++++++++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  12 +
 .../CheckConnectJvmClientCompatibility.scala       |   5 +-
 .../spark/sql/connect/client/util/QueryTest.scala  | 209 ++++++++++
 .../query-tests/explain-results/drop.explain       |   2 +
 .../query-tests/explain-results/fill.explain       |   2 +
 .../query-tests/explain-results/replace.explain    |   2 +
 .../test/resources/query-tests/queries/drop.json   |  17 +
 .../resources/query-tests/queries/drop.proto.bin   | Bin 0 -> 58 bytes
 .../test/resources/query-tests/queries/fill.json   |  19 +
 .../resources/query-tests/queries/fill.proto.bin   | Bin 0 -> 57 bytes
 .../resources/query-tests/queries/replace.json     |  24 ++
 .../query-tests/queries/replace.proto.bin          | Bin 0 -> 77 bytes
 15 files changed, 1146 insertions(+), 1 deletion(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
new file mode 100644
index 00000000000..17b95018f89
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.{NAReplace, Relation}
+import org.apache.spark.connect.proto.Expression.{Literal => GLiteral}
+import org.apache.spark.connect.proto.NAReplace.Replacement
+
+/**
+ * Functionality for working with missing data in `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameNaFunctions private[sql] (sparkSession: SparkSession, root: Relation) {
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing any null or NaN values.
+   *
+   * @since 3.4.0
+   */
+  def drop(): DataFrame = buildDropDataFrame(None, None)
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing null or NaN values.
+   *
+   * If `how` is "any", then drop rows containing any null or NaN values. If `how` is "all", then
+   * drop rows only if every column is null or NaN for that row.
+   *
+   * @since 3.4.0
+   */
+  def drop(how: String): DataFrame = {
+    buildDropDataFrame(None, buildMinNonNulls(how))
+  }
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing any null or NaN values in the specified
+   * columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(cols: Array[String]): DataFrame = drop(cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that drops rows containing any null or NaN values
+   * in the specified columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(cols: Seq[String]): DataFrame = buildDropDataFrame(Some(cols), None)
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing null or NaN values in the specified
+   * columns.
+   *
+   * If `how` is "any", then drop rows containing any null or NaN values in the specified columns.
+   * If `how` is "all", then drop rows only if every specified column is null or NaN for that row.
+   *
+   * @since 3.4.0
+   */
+  def drop(how: String, cols: Array[String]): DataFrame = drop(how, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that drops rows containing null or NaN values in
+   * the specified columns.
+   *
+   * If `how` is "any", then drop rows containing any null or NaN values in the specified columns.
+   * If `how` is "all", then drop rows only if every specified column is null or NaN for that row.
+   *
+   * @since 3.4.0
+   */
+  def drop(how: String, cols: Seq[String]): DataFrame = {
+    buildDropDataFrame(Some(cols), buildMinNonNulls(how))
+  }
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing less than `minNonNulls` non-null and
+   * non-NaN values.
+   *
+   * @since 3.4.0
+   */
+  def drop(minNonNulls: Int): DataFrame = {
+    buildDropDataFrame(None, Some(minNonNulls))
+  }
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing less than `minNonNulls` non-null and
+   * non-NaN values in the specified columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(minNonNulls: Int, cols: Array[String]): DataFrame = drop(minNonNulls, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that drops rows containing less than `minNonNulls`
+   * non-null and non-NaN values in the specified columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(minNonNulls: Int, cols: Seq[String]): DataFrame = {
+    buildDropDataFrame(Some(cols), Some(minNonNulls))
+  }
+
+  private def buildMinNonNulls(how: String): Option[Int] = {
+    how.toLowerCase(Locale.ROOT) match {
+      case "any" => None // No-Op. Do nothing.
+      case "all" => Some(1)
+      case _ => throw new IllegalArgumentException(s"how ($how) must be 'any' or 'all'")
+    }
+  }
+
+  private def buildDropDataFrame(
+      cols: Option[Seq[String]],
+      minNonNulls: Option[Int]): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      val dropNaBuilder = builder.getDropNaBuilder.setInput(root)
+      cols.foreach(c => dropNaBuilder.addAllCols(c.asJava))
+      minNonNulls.foreach(dropNaBuilder.setMinNonNulls)
+    }
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null or NaN values in numeric columns with `value`.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Long): DataFrame = {
+    buildFillDataFrame(None, GLiteral.newBuilder().setLong(value).build())
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null or NaN values in specified numeric columns. If a
+   * specified column is not a numeric column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Long, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that replaces null or NaN values in specified
+   * numeric columns. If a specified column is not a numeric column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Long, cols: Seq[String]): DataFrame = {
+    buildFillDataFrame(Some(cols), GLiteral.newBuilder().setLong(value).build())
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null or NaN values in numeric columns with `value`.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Double): DataFrame = {
+    buildFillDataFrame(None, GLiteral.newBuilder().setDouble(value).build())
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null or NaN values in specified numeric columns. If a
+   * specified column is not a numeric column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Double, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that replaces null or NaN values in specified
+   * numeric columns. If a specified column is not a numeric column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Double, cols: Seq[String]): DataFrame = {
+    buildFillDataFrame(Some(cols), GLiteral.newBuilder().setDouble(value).build())
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null values in string columns with `value`.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: String): DataFrame = {
+    buildFillDataFrame(None, GLiteral.newBuilder().setString(value).build())
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null values in specified string columns. If a
+   * specified column is not a string column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: String, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that replaces null values in specified string
+   * columns. If a specified column is not a string column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: String, cols: Seq[String]): DataFrame = {
+    buildFillDataFrame(Some(cols), GLiteral.newBuilder().setString(value).build())
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null values in boolean columns with `value`.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Boolean): DataFrame = {
+    buildFillDataFrame(None, GLiteral.newBuilder().setBoolean(value).build())
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null values in specified boolean columns. If a
+   * specified column is not a boolean column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Boolean, cols: Array[String]): DataFrame = fill(value, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that replaces null values in specified boolean
+   * columns. If a specified column is not a boolean column, it is ignored.
+   *
+   * @since 3.4.0
+   */
+  def fill(value: Boolean, cols: Seq[String]): DataFrame = {
+    buildFillDataFrame(Some(cols), GLiteral.newBuilder().setBoolean(value).build())
+  }
+
+  private def buildFillDataFrame(cols: Option[Seq[String]], value: GLiteral): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      val fillNaBuilder = builder.getFillNaBuilder.setInput(root)
+      fillNaBuilder.addValues(value)
+      cols.foreach(c => fillNaBuilder.addAllCols(c.asJava))
+    }
+  }
+
+  /**
+   * Returns a new `DataFrame` that replaces null values.
+   *
+   * The key of the map is the column name, and the value of the map is the replacement value. The
+   * value must be of the following type: `Integer`, `Long`, `Float`, `Double`, `String`,
+   * `Boolean`. Replacement values are cast to the column data type.
+   *
+   * For example, the following replaces null values in column "A" with string "unknown", and null
+   * values in column "B" with numeric value 1.0.
+   * {{{
+   *   import com.google.common.collect.ImmutableMap;
+   *   df.na.fill(ImmutableMap.of("A", "unknown", "B", 1.0));
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def fill(valueMap: java.util.Map[String, Any]): DataFrame = fillMap(valueMap.asScala.toSeq)
+
+  /**
+   * Returns a new `DataFrame` that replaces null values.
+   *
+   * The key of the map is the column name, and the value of the map is the replacement value. The
+   * value must be of the following type: `Integer`, `Long`, `Float`, `Double`, `String`,
+   * `Boolean`. Replacement values are cast to the column data type.
+   *
+   * For example, the following replaces null values in column "A" with string "unknown", and null
+   * values in column "B" with numeric value 1.0.
+   * {{{
+   *   import com.google.common.collect.ImmutableMap;
+   *   df.na.fill(ImmutableMap.of("A", "unknown", "B", 1.0));
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def fill(valueMap: Map[String, Any]): DataFrame = fillMap(valueMap.toSeq)
+
+  private def fillMap(values: Seq[(String, Any)]): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      val fillNaBuilder = builder.getFillNaBuilder.setInput(root)
+      values.map { case (colName, replaceValue) =>
+        fillNaBuilder.addCols(colName).addValues(functions.lit(replaceValue).expr.getLiteral)
+      }
+    }
+  }
+
+  /**
+   * Replaces values matching keys in `replacement` map with the corresponding values.
+   *
+   * {{{
+   *   import com.google.common.collect.ImmutableMap;
+   *
+   *   // Replaces all occurrences of 1.0 with 2.0 in column "height".
+   *   df.na.replace("height", ImmutableMap.of(1.0, 2.0));
+   *
+   *   // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "name".
+   *   df.na.replace("name", ImmutableMap.of("UNKNOWN", "unnamed"));
+   *
+   *   // Replaces all occurrences of "UNKNOWN" with "unnamed" in all string columns.
+   *   df.na.replace("*", ImmutableMap.of("UNKNOWN", "unnamed"));
+   * }}}
+   *
+   * @param col
+   *   name of the column to apply the value replacement. If `col` is "*", replacement is applied
+   *   on all string, numeric or boolean columns.
+   * @param replacement
+   *   value replacement map. Key and value of `replacement` map must have the same type, and can
+   *   only be doubles, strings or booleans. The map value can have nulls.
+   * @since 3.4.0
+   */
+  def replace[T](col: String, replacement: java.util.Map[T, T]): DataFrame =
+    replace(col, replacement.asScala.toMap)
+
+  /**
+   * (Scala-specific) Replaces values matching keys in `replacement` map.
+   *
+   * {{{
+   *   // Replaces all occurrences of 1.0 with 2.0 in column "height".
+   *   df.na.replace("height", Map(1.0 -> 2.0));
+   *
+   *   // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "name".
+   *   df.na.replace("name", Map("UNKNOWN" -> "unnamed"));
+   *
+   *   // Replaces all occurrences of "UNKNOWN" with "unnamed" in all string columns.
+   *   df.na.replace("*", Map("UNKNOWN" -> "unnamed"));
+   * }}}
+   *
+   * @param col
+   *   name of the column to apply the value replacement. If `col` is "*", replacement is applied
+   *   on all string, numeric or boolean columns.
+   * @param replacement
+   *   value replacement map. Key and value of `replacement` map must have the same type, and can
+   *   only be doubles, strings or booleans. The map value can have nulls.
+   * @since 3.4.0
+   */
+  def replace[T](col: String, replacement: Map[T, T]): DataFrame = {
+    val cols = if (col != "*") Some(Seq(col)) else None
+    buildReplaceDataFrame(cols, buildReplacement(replacement))
+  }
+
+  /**
+   * Replaces values matching keys in `replacement` map with the corresponding values.
+   *
+   * {{{
+   *   import com.google.common.collect.ImmutableMap;
+   *
+   *   // Replaces all occurrences of 1.0 with 2.0 in column "height" and "weight".
+   *   df.na.replace(new String[] {"height", "weight"}, ImmutableMap.of(1.0, 2.0));
+   *
+   *   // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "firstname" and "lastname".
+   *   df.na.replace(new String[] {"firstname", "lastname"}, ImmutableMap.of("UNKNOWN", "unnamed"));
+   * }}}
+   *
+   * @param cols
+   *   list of columns to apply the value replacement. If `col` is "*", replacement is applied on
+   *   all string, numeric or boolean columns.
+   * @param replacement
+   *   value replacement map. Key and value of `replacement` map must have the same type, and can
+   *   only be doubles, strings or booleans. The map value can have nulls.
+   * @since 3.4.0
+   */
+  def replace[T](cols: Array[String], replacement: java.util.Map[T, T]): DataFrame = {
+    replace(cols.toSeq, replacement.asScala.toMap)
+  }
+
+  /**
+   * (Scala-specific) Replaces values matching keys in `replacement` map.
+   *
+   * {{{
+   *   // Replaces all occurrences of 1.0 with 2.0 in column "height" and "weight".
+   *   df.na.replace("height" :: "weight" :: Nil, Map(1.0 -> 2.0));
+   *
+   *   // Replaces all occurrences of "UNKNOWN" with "unnamed" in column "firstname" and "lastname".
+   *   df.na.replace("firstname" :: "lastname" :: Nil, Map("UNKNOWN" -> "unnamed"));
+   * }}}
+   *
+   * @param cols
+   *   list of columns to apply the value replacement. If `col` is "*", replacement is applied on
+   *   all string, numeric or boolean columns.
+   * @param replacement
+   *   value replacement map. Key and value of `replacement` map must have the same type, and can
+   *   only be doubles, strings or booleans. The map value can have nulls.
+   * @since 3.4.0
+   */
+  def replace[T](cols: Seq[String], replacement: Map[T, T]): DataFrame = {
+    buildReplaceDataFrame(Some(cols), buildReplacement(replacement))
+  }
+
+  private def buildReplaceDataFrame(
+      cols: Option[Seq[String]],
+      replacements: Iterable[NAReplace.Replacement]): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      val replaceBuilder = builder.getReplaceBuilder.setInput(root)
+      replaceBuilder.addAllReplacements(replacements.asJava)
+      cols.foreach(c => replaceBuilder.addAllCols(c.asJava))
+    }
+  }
+
+  private def buildReplacement[T](replacement: Map[T, T]): Iterable[NAReplace.Replacement] = {
+    // Convert the NumericType in replacement map to DoubleType,
+    // while leaving StringType, BooleanType and null untouched.
+    val replacementMap: Map[_, _] = replacement.map {
+      case (k, v: String) => (k, v)
+      case (k, v: Boolean) => (k, v)
+      case (k: String, null) => (k, null)
+      case (k: Boolean, null) => (k, null)
+      case (k, null) => (convertToDouble(k), null)
+      case (k, v) => (convertToDouble(k), convertToDouble(v))
+    }
+    replacementMap.map { case (oldValue, newValue) =>
+      Replacement
+        .newBuilder()
+        .setOldValue(functions.lit(oldValue).expr.getLiteral)
+        .setNewValue(functions.lit(newValue).expr.getLiteral)
+        .build()
+    }
+  }
+
+  private def convertToDouble(v: Any): Double = v match {
+    case v: Float => v.toDouble
+    case v: Double => v
+    case v: Long => v.toDouble
+    case v: Int => v.toDouble
+    case v =>
+      throw new IllegalArgumentException(s"Unsupported value type ${v.getClass.getName} ($v).")
+  }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 74dd58f4903..13dff3a874f 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -534,6 +534,18 @@ class Dataset[T] private[sql] (
     }
   }
 
+  /**
+   * Returns a [[DataFrameNaFunctions]] for working with missing data.
+   * {{{
+   *   // Dropping rows containing any null values.
+   *   ds.na.drop()
+   * }}}
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def na: DataFrameNaFunctions = new DataFrameNaFunctions(sparkSession, plan.getRoot)
+
   /**
    * Returns a [[DataFrameStatFunctions]] for working statistic functions support.
    * {{{
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala
new file mode 100644
index 00000000000..5049147678b
--- /dev/null
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.connect.client.util.QueryTest
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class DataFrameNaFunctionSuite extends QueryTest {
+  private def createDF(): DataFrame = {
+    val sparkSession = spark
+    import sparkSession.implicits._
+    Seq[(String, java.lang.Integer, java.lang.Double)](
+      ("Bob", 16, 176.5),
+      ("Alice", null, 164.3),
+      ("David", 60, null),
+      ("Nina", 25, Double.NaN),
+      ("Amy", null, null),
+      (null, null, null)).toDF("name", "age", "height")
+  }
+
+  def createNaNDF(): DataFrame = {
+    val sparkSession = spark
+    import sparkSession.implicits._
+    Seq[(
+        java.lang.Integer,
+        java.lang.Long,
+        java.lang.Short,
+        java.lang.Byte,
+        java.lang.Float,
+        java.lang.Double)](
+      (1, 1L, 1.toShort, 1.toByte, 1.0f, 1.0),
+      (0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN)).toDF(
+      "int",
+      "long",
+      "short",
+      "byte",
+      "float",
+      "double")
+  }
+
+  def createDFWithNestedColumns: DataFrame = {
+    val schema = new StructType()
+      .add(
+        "c1",
+        new StructType()
+          .add("c1-1", StringType)
+          .add("c1-2", StringType))
+    val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null))
+    spark.createDataFrame(data.asJava, schema)
+  }
+
+  test("drop") {
+    val input = createDF()
+    val rows = input.collect()
+
+    val result1 = input.na.drop("name" :: Nil).select("name")
+    val expected1 = Array(Row("Bob"), Row("Alice"), Row("David"), Row("Nina"), Row("Amy"))
+    checkAnswer(result1, expected1)
+
+    val result2 = input.na.drop("age" :: Nil).select("name")
+    val expected2 = Array(Row("Bob"), Row("David"), Row("Nina"))
+    checkAnswer(result2, expected2)
+
+    val result3 = input.na.drop("age" :: "height" :: Nil)
+    val expected3 = Array(rows(0))
+    checkAnswer(result3, expected3)
+
+    val result4 = input.na.drop()
+    checkAnswer(result4, expected3)
+
+    // dropna on an a dataframe with no column should return an empty data frame.
+    val empty = input.filter("age > 100")
+    assert(empty.na.drop().count() === 0L)
+
+    // Make sure the columns are properly named.
+    assert(input.na.drop().columns.toSeq === input.columns.toSeq)
+  }
+
+  test("drop with how") {
+    val input = createDF()
+    val rows = input.collect()
+
+    checkAnswer(
+      input.na.drop("all").select("name"),
+      Row("Bob") :: Row("Alice") :: Row("David") :: Row("Nina") :: Row("Amy") :: Nil)
+
+    checkAnswer(input.na.drop("any"), rows(0) :: Nil)
+
+    checkAnswer(input.na.drop("any", Seq("age", "height")), rows(0) :: Nil)
+
+    checkAnswer(
+      input.na.drop("all", Seq("age", "height")).select("name"),
+      Row("Bob") :: Row("Alice") :: Row("David") :: Row("Nina") :: Nil)
+  }
+
+  test("drop with threshold") {
+    val input = createDF()
+    val rows = input.collect()
+
+    checkAnswer(input.na.drop(2, Seq("age", "height")), rows(0) :: Nil)
+
+    checkAnswer(input.na.drop(3, Seq("name", "age", "height")), rows(0))
+
+    // Make sure the columns are properly named.
+    assert(input.na.drop(2, Seq("age", "height")).columns.toSeq === input.columns.toSeq)
+  }
+
+  test("fill") {
+    val sparkSession = spark
+    import sparkSession.implicits._
+
+    val input = createDF()
+
+    val boolInput = Seq[(String, java.lang.Boolean)](
+      ("Bob", false),
+      ("Alice", null),
+      ("Mallory", true),
+      (null, null)).toDF("name", "spy")
+
+    val fillNumeric = input.na.fill(50.6)
+    checkAnswer(
+      fillNumeric,
+      Row("Bob", 16, 176.5) ::
+        Row("Alice", 50, 164.3) ::
+        Row("David", 60, 50.6) ::
+        Row("Nina", 25, 50.6) ::
+        Row("Amy", 50, 50.6) ::
+        Row(null, 50, 50.6) :: Nil)
+
+    // Make sure the columns are properly named.
+    assert(fillNumeric.columns.toSeq === input.columns.toSeq)
+
+    // string
+    checkAnswer(
+      input.na.fill("unknown").select("name"),
+      Row("Bob") :: Row("Alice") :: Row("David") ::
+        Row("Nina") :: Row("Amy") :: Row("unknown") :: Nil)
+    assert(input.na.fill("unknown").columns.toSeq === input.columns.toSeq)
+
+    // boolean
+    checkAnswer(
+      boolInput.na.fill(true).select("spy"),
+      Row(false) :: Row(true) :: Row(true) :: Row(true) :: Nil)
+    assert(boolInput.na.fill(true).columns.toSeq === boolInput.columns.toSeq)
+
+    // fill double with subset columns
+    checkAnswer(
+      input.na.fill(50.6, "age" :: Nil).select("name", "age"),
+      Row("Bob", 16) ::
+        Row("Alice", 50) ::
+        Row("David", 60) ::
+        Row("Nina", 25) ::
+        Row("Amy", 50) ::
+        Row(null, 50) :: Nil)
+
+    // fill boolean with subset columns
+    checkAnswer(
+      boolInput.na.fill(true, "spy" :: Nil).select("name", "spy"),
+      Row("Bob", false) ::
+        Row("Alice", true) ::
+        Row("Mallory", true) ::
+        Row(null, true) :: Nil)
+
+    // fill string with subset columns
+    checkAnswer(
+      Seq[(String, String)]((null, null)).toDF("col1", "col2").na.fill("test", "col1" :: Nil),
+      Row("test", null))
+
+    checkAnswer(
+      Seq[(Long, Long)]((1, 2), (-1, -2), (9123146099426677101L, 9123146560113991650L))
+        .toDF("a", "b")
+        .na
+        .fill(0),
+      Row(1, 2) :: Row(-1, -2) :: Row(9123146099426677101L, 9123146560113991650L) :: Nil)
+
+    checkAnswer(
+      Seq[(java.lang.Long, java.lang.Double)](
+        (null, 3.14),
+        (9123146099426677101L, null),
+        (9123146560113991650L, 1.6),
+        (null, null)).toDF("a", "b").na.fill(0.2),
+      Row(0, 3.14) :: Row(9123146099426677101L, 0.2) :: Row(9123146560113991650L, 1.6)
+        :: Row(0, 0.2) :: Nil)
+
+    checkAnswer(
+      Seq[(java.lang.Long, java.lang.Float)](
+        (null, 3.14f),
+        (9123146099426677101L, null),
+        (9123146560113991650L, 1.6f),
+        (null, null)).toDF("a", "b").na.fill(0.2),
+      Row(0, 3.14f) :: Row(9123146099426677101L, 0.2f) :: Row(9123146560113991650L, 1.6f)
+        :: Row(0, 0.2f) :: Nil)
+
+    checkAnswer(
+      Seq[(java.lang.Long, java.lang.Double)]((null, 1.23), (3L, null), (4L, 3.45))
+        .toDF("a", "b")
+        .na
+        .fill(2.34),
+      Row(2, 1.23) :: Row(3, 2.34) :: Row(4, 3.45) :: Nil)
+
+    checkAnswer(
+      Seq[(java.lang.Long, java.lang.Double)]((null, 1.23), (3L, null), (4L, 3.45))
+        .toDF("a", "b")
+        .na
+        .fill(5),
+      Row(5, 1.23) :: Row(3, 5.0) :: Row(4, 3.45) :: Nil)
+  }
+
+  test("fill with map") {
+    val sparkSession = spark
+    import sparkSession.implicits._
+
+    val df = Seq[(
+        String,
+        String,
+        java.lang.Integer,
+        java.lang.Long,
+        java.lang.Float,
+        java.lang.Double,
+        java.lang.Boolean)]((null, null, null, null, null, null, null))
+      .toDF(
+        "stringFieldA",
+        "stringFieldB",
+        "integerField",
+        "longField",
+        "floatField",
+        "doubleField",
+        "booleanField")
+
+    val fillMap = Map(
+      "stringFieldA" -> "test",
+      "integerField" -> 1,
+      "longField" -> 2L,
+      "floatField" -> 3.3f,
+      "doubleField" -> 4.4d,
+      "booleanField" -> false)
+
+    val expectedRow = Row("test", null, 1, 2L, 3.3f, 4.4d, false)
+    checkAnswer(df.na.fill(fillMap), expectedRow)
+    checkAnswer(df.na.fill(fillMap.asJava), expectedRow) // Test Java version
+
+    // Ensure replacement values are cast to the column data type.
+    checkAnswer(
+      df.na.fill(
+        Map("integerField" -> 1d, "longField" -> 2d, "floatField" -> 3d, "doubleField" -> 4d)),
+      Row(null, null, 1, 2L, 3f, 4d, null))
+
+    // Ensure column types do not change. Columns that have null values replaced
+    // will no longer be flagged as nullable, so do not compare schemas directly.
+    assert(
+      df.na.fill(fillMap).schema.fields.map(_.dataType) ===
+        df.schema.fields.map(_.dataType))
+  }
+
+  test("fill with col(*)") {
+    val df = createDF()
+    // If columns are specified with "*", they are ignored.
+    checkAnswer(df.na.fill("new name", Seq("*")), df.collect())
+  }
+
+  test("drop with col(*)") {
+    val df = createDF()
+    val ex = intercept[RuntimeException] {
+      df.na.drop("any", Seq("*")).collect()
+    }
+    assert(ex.getMessage.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"))
+  }
+
+  test("fill with nested columns") {
+    val df = createDFWithNestedColumns
+    checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), df)
+  }
+
+  test("drop with nested columns") {
+    val df = createDFWithNestedColumns
+
+    // Rows with the specified nested columns whose null values are dropped.
+    assert(df.count == 3)
+    checkAnswer(df.na.drop("any", Seq("c1.c1-1")), Seq(Row(Row("b1", "b2"))))
+  }
+
+  test("replace") {
+    val input = createDF()
+
+    val result1 = input.na
+      .replace(
+        Seq("age", "height"),
+        Map(
+          16 -> 61,
+          60 -> 6,
+          164.3 -> 461.3 // Alice is really tall
+        ))
+      .collect()
+    assert(result1(0) === Row("Bob", 61, 176.5))
+    assert(result1(1) === Row("Alice", null, 461.3))
+    assert(result1(2) === Row("David", 6, null))
+    assert(result1(3).get(2).asInstanceOf[Double].isNaN)
+    assert(result1(4) === Row("Amy", null, null))
+    assert(result1(5) === Row(null, null, null))
+
+    // Replace only the age column
+    val result2 = input.na
+      .replace(
+        "age",
+        Map(
+          16 -> 61,
+          60 -> 6,
+          164.3 -> 461.3 // Alice is really tall
+        ))
+      .collect()
+    assert(result2(0) === Row("Bob", 61, 176.5))
+    assert(result2(1) === Row("Alice", null, 164.3))
+    assert(result2(2) === Row("David", 6, null))
+    assert(result2(3).get(2).asInstanceOf[Double].isNaN)
+    assert(result2(4) === Row("Amy", null, null))
+    assert(result2(5) === Row(null, null, null))
+  }
+
+  test("replace with null") {
+    val input = spark.sql(
+      "select name, height, married from (values " +
+        "('Bob', 176.5, true), " +
+        "('Alice', 164.3, false), " +
+        "('David', null, true))" +
+        "as t(name, height, married)")
+
+    // Replace String with String and null
+    val result1 = input.na.replace("name", Map("Bob" -> "Bravo", "Alice" -> null))
+
+    checkAnswer(
+      result1,
+      Row("Bravo", 176.5, true) ::
+        Row(null, 164.3, false) ::
+        Row("David", null, true) :: Nil)
+
+    // Replace Double with null
+    val result2 = input.na.replace("height", Map[Any, Any](164.3 -> null))
+    checkAnswer(
+      result2,
+      Row("Bob", 176.5, true) ::
+        Row("Alice", null, false) ::
+        Row("David", null, true) :: Nil)
+
+    // Replace Boolean with null
+    checkAnswer(
+      input.na.replace("*", Map[Any, Any](false -> null)),
+      Row("Bob", 176.5, true) ::
+        Row("Alice", 164.3, null) ::
+        Row("David", null, true) :: Nil)
+
+    // Replace String with null and then drop rows containing null
+    checkAnswer(
+      input.na.replace("name", Map("Bob" -> null)).na.drop("name" :: Nil).select("name"),
+      Row("Alice") :: Row("David") :: Nil)
+  }
+
+  test("replace nan with float") {
+    checkAnswer(
+      createNaNDF().na.replace("*", Map(Float.NaN -> 10.0f)),
+      Row(1, 1L, 1.toShort, 1.toByte, 1.0f, 1.0) ::
+        Row(0, 0L, 0.toShort, 0.toByte, 10.0f, 10.0) :: Nil)
+  }
+
+  test("replace nan with double") {
+    checkAnswer(
+      createNaNDF().na.replace("*", Map(Double.NaN -> 10.0)),
+      Row(1, 1L, 1.toShort, 1.toByte, 1.0f, 1.0) ::
+        Row(0, 0L, 0.toShort, 0.toByte, 10.0f, 10.0) :: Nil)
+  }
+
+  test("replace float with nan") {
+    checkAnswer(
+      createNaNDF().na.replace("*", Map(1.0f -> Float.NaN)),
+      Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) ::
+        Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) :: Nil)
+  }
+
+  test("replace double with nan") {
+    checkAnswer(
+      createNaNDF().na.replace("*", Map(1.0 -> Double.NaN)),
+      Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) ::
+        Row(0, 0L, 0.toShort, 0.toByte, Float.NaN, Double.NaN) :: Nil)
+  }
+
+}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index b59c783aec9..f5ffaf9b73a 100755
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2058,4 +2058,16 @@ class PlanGenerationTestSuite
   test("sampleBy") {
     simple.stat.sampleBy("id", Map(0 -> 0.1, 1 -> 0.2), 0L)
   }
+
+  test("drop") {
+    simple.na.drop(5, Seq("id", "a"))
+  }
+
+  test("fill") {
+    simple.na.fill(8L, Seq("id"))
+  }
+
+  test("replace") {
+    simple.na.replace[Long]("id", Map(1L -> 8L))
+  }
 }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index b684721243a..868e7ae7b74 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -109,6 +109,7 @@ object CheckConnectJvmClientCompatibility {
       IncludeByName("org.apache.spark.sql.ColumnName.*"),
       IncludeByName("org.apache.spark.sql.DataFrame.*"),
       IncludeByName("org.apache.spark.sql.DataFrameReader.*"),
+      IncludeByName("org.apache.spark.sql.DataFrameNaFunctions.*"),
       IncludeByName("org.apache.spark.sql.DataFrameStatFunctions.*"),
       IncludeByName("org.apache.spark.sql.DataFrameWriter.*"),
       IncludeByName("org.apache.spark.sql.DataFrameWriterV2.*"),
@@ -134,6 +135,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
 
+      // DataFrameNaFunctions
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"),
+
       // DataFrameStatFunctions
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameStatFunctions.bloomFilter"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameStatFunctions.this"),
@@ -148,7 +152,6 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.select"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/QueryTest.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/QueryTest.scala
new file mode 100644
index 00000000000..1c3f49f897f
--- /dev/null
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/QueryTest.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.util
+
+import java.util.TimeZone
+
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.util.sideBySide
+
+abstract class QueryTest extends RemoteSparkSession {
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   *
+   * @param df
+   *   the [[DataFrame]] to be executed
+   * @param expectedAnswer
+   *   the expected result in a [[Seq]] of [[Row]]s.
+   */
+  protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
+    QueryTest.checkAnswer(df, expectedAnswer)
+  }
+
+  protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = {
+    checkAnswer(df, Seq(expectedAnswer))
+  }
+
+  protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = {
+    checkAnswer(df, expectedAnswer.collect())
+  }
+}
+
+object QueryTest extends Assertions {
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   *
+   * @param df
+   *   the DataFrame to be executed
+   * @param expectedAnswer
+   *   the expected result in a Seq of Rows.
+   */
+  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row], isSorted: Boolean = false): Unit = {
+    getErrorMessageInCheckAnswer(df, expectedAnswer, isSorted) match {
+      case Some(errorMessage) => fail(errorMessage)
+      case None =>
+    }
+  }
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result. If there was exception
+   * during the execution or the contents of the DataFrame does not match the expected result, an
+   * error message will be returned. Otherwise, a None will be returned.
+   *
+   * @param df
+   *   the DataFrame to be executed
+   * @param expectedAnswer
+   *   the expected result in a Seq of Rows.
+   */
+  def getErrorMessageInCheckAnswer(
+      df: DataFrame,
+      expectedAnswer: Seq[Row],
+      isSorted: Boolean = false): Option[String] = {
+    val sparkAnswer =
+      try df.collect().toSeq
+      catch {
+        case e: Exception =>
+          val errorMessage =
+            s"""
+             |Exception thrown while executing query:
+             |${df.analyze}
+             |== Exception ==
+             |$e
+             |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
+        """.stripMargin
+          return Some(errorMessage)
+      }
+
+    sameRows(expectedAnswer, sparkAnswer, isSorted).map { results =>
+      s"""
+         |Results do not match for query:
+         |Timezone: ${TimeZone.getDefault}
+         |Timezone Env: ${sys.env.getOrElse("TZ", "")}
+         |
+         |${df.analyze}
+         |== Results ==
+         |$results
+     """.stripMargin
+    }
+  }
+
+  def prepareAnswer(answer: Seq[Row], isSorted: Boolean): Seq[Row] = {
+    // Converts data to types that we can do equality comparison using Scala collections.
+    // For BigDecimal type, the Scala type has a better definition of equality test (similar to
+    // Java's java.math.BigDecimal.compareTo).
+    // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
+    // equality test.
+    val converted: Seq[Row] = answer.map(prepareRow)
+    if (!isSorted) converted.sortBy(_.toString()) else converted
+  }
+
+  // We need to call prepareRow recursively to handle schemas with struct types.
+  def prepareRow(row: Row): Row = {
+    Row.fromSeq(row.toSeq.map {
+      case null => null
+      case bd: java.math.BigDecimal => BigDecimal(bd)
+      // Equality of WrappedArray differs for AnyVal and AnyRef in Scala 2.12.2+
+      case seq: Seq[_] =>
+        seq.map {
+          case b: java.lang.Byte => b.byteValue
+          case s: java.lang.Short => s.shortValue
+          case i: java.lang.Integer => i.intValue
+          case l: java.lang.Long => l.longValue
+          case f: java.lang.Float => f.floatValue
+          case d: java.lang.Double => d.doubleValue
+          case x => x
+        }
+      // Convert array to Seq for easy equality check.
+      case b: Array[_] => b.toSeq
+      case r: Row => prepareRow(r)
+      case o => o
+    })
+  }
+
+  private def genError(
+      expectedAnswer: Seq[Row],
+      sparkAnswer: Seq[Row],
+      isSorted: Boolean = false): String = {
+    val getRowType: Option[Row] => String = row =>
+      row
+        .map(row =>
+          if (row.schema == null) {
+            "struct<>"
+          } else {
+            s"${row.schema.catalogString}"
+          })
+        .getOrElse("struct<>")
+
+    s"""
+       |== Results ==
+       |${sideBySide(
+        s"== Correct Answer - ${expectedAnswer.size} ==" +:
+          getRowType(expectedAnswer.headOption) +:
+          prepareAnswer(expectedAnswer, isSorted).map(_.toString()),
+        s"== Spark Answer - ${sparkAnswer.size} ==" +:
+          getRowType(sparkAnswer.headOption) +:
+          prepareAnswer(sparkAnswer, isSorted).map(_.toString())).mkString("\n")}
+  """.stripMargin
+  }
+
+  def includesRows(expectedRows: Seq[Row], sparkAnswer: Seq[Row]): Option[String] = {
+    if (!prepareAnswer(expectedRows, true).toSet.subsetOf(
+        prepareAnswer(sparkAnswer, true).toSet)) {
+      return Some(genError(expectedRows, sparkAnswer, true))
+    }
+    None
+  }
+
+  def compare(obj1: Any, obj2: Any): Boolean = (obj1, obj2) match {
+    case (null, null) => true
+    case (null, _) => false
+    case (_, null) => false
+    case (a: Array[_], b: Array[_]) =>
+      a.length == b.length && a.zip(b).forall { case (l, r) => compare(l, r) }
+    case (a: Map[_, _], b: Map[_, _]) =>
+      a.size == b.size && a.keys.forall { aKey =>
+        b.keys.find(bKey => compare(aKey, bKey)).exists(bKey => compare(a(aKey), b(bKey)))
+      }
+    case (a: Iterable[_], b: Iterable[_]) =>
+      a.size == b.size && a.zip(b).forall { case (l, r) => compare(l, r) }
+    case (a: Product, b: Product) =>
+      compare(a.productIterator.toSeq, b.productIterator.toSeq)
+    case (a: Row, b: Row) =>
+      compare(a.toSeq, b.toSeq)
+    // 0.0 == -0.0, turn float/double to bits before comparison, to distinguish 0.0 and -0.0.
+    case (a: Double, b: Double) =>
+      java.lang.Double.doubleToRawLongBits(a) == java.lang.Double.doubleToRawLongBits(b)
+    case (a: Float, b: Float) =>
+      java.lang.Float.floatToRawIntBits(a) == java.lang.Float.floatToRawIntBits(b)
+    case (a, b) => a == b
+  }
+
+  def sameRows(
+      expectedAnswer: Seq[Row],
+      sparkAnswer: Seq[Row],
+      isSorted: Boolean = false): Option[String] = {
+    if (!compare(prepareAnswer(expectedAnswer, isSorted), prepareAnswer(sparkAnswer, isSorted))) {
+      return Some(genError(expectedAnswer, sparkAnswer, isSorted))
+    }
+    None
+  }
+}
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop.explain
new file mode 100644
index 00000000000..85a15dfab8d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop.explain
@@ -0,0 +1,2 @@
+Filter atleastnnonnulls(5, id#0L, a#0)
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/fill.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/fill.explain
new file mode 100644
index 00000000000..12d9bff0e8a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/fill.explain
@@ -0,0 +1,2 @@
+Project [coalesce(id#0L, cast(8 as bigint)) AS id#0L, a#0, b#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/replace.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/replace.explain
new file mode 100644
index 00000000000..ef3de21e881
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/replace.explain
@@ -0,0 +1,2 @@
+Project [CASE WHEN (cast(id#0L as double) = 1.0) THEN cast(8.0 as bigint) ELSE id#0L END AS id#0L, a#0, b#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop.json b/connector/connect/common/src/test/resources/query-tests/queries/drop.json
new file mode 100644
index 00000000000..a5176b25b05
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop.json
@@ -0,0 +1,17 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "dropNa": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": ["id", "a"],
+    "minNonNulls": 5
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop.proto.bin
new file mode 100644
index 00000000000..9e18d02afbc
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/drop.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/fill.json b/connector/connect/common/src/test/resources/query-tests/queries/fill.json
new file mode 100644
index 00000000000..8308af1f579
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/fill.json
@@ -0,0 +1,19 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "fillNa": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": ["id"],
+    "values": [{
+      "long": "8"
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/fill.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/fill.proto.bin
new file mode 100644
index 00000000000..b034c5e64a8
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/fill.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/replace.json b/connector/connect/common/src/test/resources/query-tests/queries/replace.json
new file mode 100644
index 00000000000..d0e39d340c0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/replace.json
@@ -0,0 +1,24 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "replace": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": ["id"],
+    "replacements": [{
+      "oldValue": {
+        "double": 1.0
+      },
+      "newValue": {
+        "double": 8.0
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/replace.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/replace.proto.bin
new file mode 100644
index 00000000000..d1868cee7bf
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/replace.proto.bin differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org