You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jeff Evans <je...@gmail.com> on 2019/10/29 21:37:27 UTC
Deleting columns within nested arrays/structs?
The starting point for the code is the various answer to this
<https://stackoverflow.com/questions/32727279/dropping-a-nested-column-from-spark-dataframe>
StackOverflow question. Fixing some of the issues there, I end up with the
following:
def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
def dropSubColumn(col: Column, colType: DataType, fullColName: String,
dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType,
s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
Some(array(struct(innerType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType,
s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*)))
}
case _ => Some(col)
}
} else {
Some(col)
}
}
Now, when I try this out on a simple nested JSON, it seems to work, in the
sense that non-removed column names still exist. However, the type of the
"surviving" sibling field (i.e. the one not removed) has become wrapped in
an array type. I have spent a while stepping through the code and can't
quite understand why this is happening. Somehow, the GetArrayStructFields
class is involved.
// read some nested JSON with structs/arrays
val json = """{
"foo": "bar",
"top": {
"child1": 5,
"child2": [{
"child2First": "one",
"child2Second": 2
}]
}
}""".stripMargin
val df = spark.read.option("multiLine", "true").json(Seq(json).toDS())
val resultDf = dropColumn(df, "top.child2.child2First")
resultDf.select("top.child2.child2Second")
/*
+------------+
|child2Second|
+------------+
| [[2]]|
+------------+
*/
// check the same from the original DataFrame
df.select("top.child2.child2Second")
/*
+------------+
|child2Second|
+------------+
| [2]|
+------------+
*/
// check the field type for "child2Second"
resultDf.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(0).dataType.typeName
// prints array
// check the same from the original DataFrame (when it was index 1)
df.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(1).dataType.typeName
// prints long
Is the code above incorrect, with regards to dropping nested fields (in
this case, a field within a struct, which itself is in an array)? Or is
there some other consideration I'm missing? Any insight is appreciated.