You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amandeep Sharma (Jira)" <ji...@apache.org> on 2021/02/10 10:42:01 UTC
[jira] [Updated] (SPARK-34417)
org.apache.spark.sql.DataFrameNaFunctions.fillMap(values: Seq[(String,
Any)]) fails for column name having a dot
[ https://issues.apache.org/jira/browse/SPARK-34417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Amandeep Sharma updated SPARK-34417:
------------------------------------
Description:
Code to reproduce the issue:
{code:java}
import org.apache.spark.sql.SparkSession
object ColumnNameWithDot {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("Simple Application")
.config("spark.master", "local").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col")
df.na.fill(Map("`ColWith.Dot`" -> "na"))
.show()
}
}
{code}
*Analysis*
*------------------------------PART-I-----------------------------------*
Debugged the spark code. It is due to a bug in the spark-catalyst code at
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]
Function in question resolves the column per the code-comments in the following order until a match is found.
* Consider pattern dbName.tableName.columnName
* Consider tableName.columnName
* Consider everything as columnName
But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.).
*------------------------------PART-II-----------------------------------*
If we don’t use column name with back-tick them it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400]
If it is quoted, the condition at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413] becomes false as *k* has value quoted with back-tick whereas *f.name* is not. Then it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422]
It is failing due to the reason mentioned in the PART-I.
*Solution*
Make changes in [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] as below:
{color:#ff0000}-val name = nameParts.head{color}
{color:#00875a}+ val name = nameParts.mkString(".") // join all part using .{color}
val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
{color:#ff0000}- (attributes, nameParts.tail){color}
{color:#00875a}+ (attributes, Seq.empty){color}
*{color:#172b4d}Workaround{color}*
{color:#172b4d}We can make change in [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]{color}
{color:#172b4d}While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.{color}
{color:#172b4d}Idea is to use resolved named instead of input name while filling null values.{color}
{code:java}
private def fillMap(values: Seq[(String, Any)]): DataFrame = {
// Error handling
var resolved: Map[String, Any] = Map()
values.foreach { case (colName, replaceValue) =>
// Check column name exists
val resolvedColumn = df.resolve(colName)
// Check data type
replaceValue match {
case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String =>
// This is good
case _ => throw new IllegalArgumentException(
s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).")
}
resolved += (resolvedColumn.name -> replaceValue)
}
val columnEquals = df.sparkSession.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
v match {
case v: jl.Float => fillCol[Float](f, v)
case v: jl.Double => fillCol[Double](f, v)
case v: jl.Long => fillCol[Long](f, v)
case v: jl.Integer => fillCol[Integer](f, v)
case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
case v: String => fillCol[String](f, v)
}
}.getOrElse(df.col(f.name))
}
df.select(projections : _*)
}
{code}
was:
Code to reproduce the issue:
{code:java}
import org.apache.spark.sql.SparkSession
object ColumnNameWithDot {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("Simple Application")
.config("spark.master", "local").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col")
df.na.fill(Map("`ColWith.Dot`" -> "na"))
.show()
}
}
{code}
*Analysis*
*------------------------------PART-I-----------------------------------*
Debugged the spark code. It is due to a bug in the spark-catalyst code at
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]
Function in question resolves the column per the code-comments in the following order until a match is found.
* Consider pattern dbName.tableName.columnName
* Consider tableName.columnName
* Consider everything as columnName
But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.).
*------------------------------PART-II-----------------------------------*
If we don’t use column name with back-tick them it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400]
If it is quoted, the condition at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413] becomes false as *k* has value quoted with back-tick whereas *f.name* is not. Then it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422]
It is failing due to the reason mentioned in the PART-I.
*Solution*
Make changes in [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] as below:
{color:#FF0000}val name = nameParts.head{color}
{color:#00875a}+ val name = nameParts.mkString(".") // join all part using .{color}
val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
{color:#FF0000}- (attributes, nameParts.tail){color}
{color:#00875a}+ (attributes, Seq.empty){color}
*{color:#172b4d}Workaround{color}*
{color:#172b4d}We can make change in [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]
{color}
{color:#172b4d}While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.{color}
{color:#172b4d}Idea is to use resolved named instead of input name while filling null values.{color}
{code:java}
private def fillMap(values: Seq[(String, Any)]): DataFrame = {
// Error handling
var resolved: Map[String, Any] = Map()
values.foreach { case (colName, replaceValue) =>
// Check column name exists
val resolvedColumn = df.resolve(colName)
// Check data type
replaceValue match {
case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String =>
// This is good
case _ => throw new IllegalArgumentException(
s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).")
}
resolved += (resolvedColumn.name -> replaceValue)
}
val columnEquals = df.sparkSession.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
v match {
case v: jl.Float => fillCol[Float](f, v)
case v: jl.Double => fillCol[Double](f, v)
case v: jl.Long => fillCol[Long](f, v)
case v: jl.Integer => fillCol[Integer](f, v)
case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
case v: String => fillCol[String](f, v)
}
}.getOrElse(df.col(f.name))
}
df.select(projections : _*)
}
{code}
> org.apache.spark.sql.DataFrameNaFunctions.fillMap(values: Seq[(String, Any)]) fails for column name having a dot
> ----------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-34417
> URL: https://issues.apache.org/jira/browse/SPARK-34417
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.0.1
> Environment: Spark version - 3.0.1
> OS - macOS 10.15.7
> Reporter: Amandeep Sharma
> Priority: Major
>
> Code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.SparkSession
> object ColumnNameWithDot {
> def main(args: Array[String]): Unit = {
> val spark = SparkSession.builder.appName("Simple Application")
> .config("spark.master", "local").getOrCreate()
> spark.sparkContext.setLogLevel("OFF")
> import spark.implicits._
> val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col")
> df.na.fill(Map("`ColWith.Dot`" -> "na"))
> .show()
> }
> }
> {code}
> *Analysis*
> *------------------------------PART-I-----------------------------------*
> Debugged the spark code. It is due to a bug in the spark-catalyst code at
> [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]
> Function in question resolves the column per the code-comments in the following order until a match is found.
> * Consider pattern dbName.tableName.columnName
> * Consider tableName.columnName
> * Consider everything as columnName
> But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.).
> *------------------------------PART-II-----------------------------------*
> If we don’t use column name with back-tick them it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400]
> If it is quoted, the condition at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413] becomes false as *k* has value quoted with back-tick whereas *f.name* is not. Then it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422]
> It is failing due to the reason mentioned in the PART-I.
> *Solution*
>
> Make changes in [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] as below:
> {color:#ff0000}-val name = nameParts.head{color}
> {color:#00875a}+ val name = nameParts.mkString(".") // join all part using .{color}
> val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
> {color:#ff0000}- (attributes, nameParts.tail){color}
> {color:#00875a}+ (attributes, Seq.empty){color}
> *{color:#172b4d}Workaround{color}*
> {color:#172b4d}We can make change in [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]{color}
> {color:#172b4d}While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.{color}
> {color:#172b4d}Idea is to use resolved named instead of input name while filling null values.{color}
> {code:java}
> private def fillMap(values: Seq[(String, Any)]): DataFrame = {
> // Error handling
> var resolved: Map[String, Any] = Map()
> values.foreach { case (colName, replaceValue) =>
> // Check column name exists
> val resolvedColumn = df.resolve(colName)
> // Check data type
> replaceValue match {
> case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String =>
> // This is good
> case _ => throw new IllegalArgumentException(
> s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).")
> }
> resolved += (resolvedColumn.name -> replaceValue)
> }
> val columnEquals = df.sparkSession.sessionState.analyzer.resolver
> val projections = df.schema.fields.map { f =>
> resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
> v match {
> case v: jl.Float => fillCol[Float](f, v)
> case v: jl.Double => fillCol[Double](f, v)
> case v: jl.Long => fillCol[Long](f, v)
> case v: jl.Integer => fillCol[Integer](f, v)
> case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
> case v: String => fillCol[String](f, v)
> }
> }.getOrElse(df.col(f.name))
> }
> df.select(projections : _*)
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org