You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jeff Evans <je...@gmail.com> on 2019/10/29 21:37:27 UTC

Deleting columns within nested arrays/structs?

The starting point for the code is the various answer to this
<https://stackoverflow.com/questions/32727279/dropping-a-nested-column-from-spark-dataframe>
StackOverflow question.  Fixing some of the issues there, I end up with the
following:

  def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields
        .flatMap(f => {
          if (colName.startsWith(s"${f.name}.")) {
            dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
              case Some(x) => Some((f.name, x))
              case None => None
            }
          } else {
            None
          }
        })
        .foldLeft(df) {
          case (df, (colName, column)) => df.withColumn(colName, column)
        }
  }

  def dropSubColumn(col: Column, colType: DataType, fullColName: String,
dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
          Some(struct(
            colType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType,
s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
              Some(array(struct(innerType.fields
                  .flatMap(f =>
                    dropSubColumn(col.getField(f.name), f.dataType,
s"$fullColName.${f.name}", dropColName) match {
                      case Some(x) => Some(x.alias(f.name))
                      case None => None
                    })
                  : _*)))
          }

        case _ => Some(col)
      }
    } else {
      Some(col)
    }
  }

Now, when I try this out on a simple nested JSON, it seems to work, in the
sense that non-removed column names still exist.  However, the type of the
"surviving" sibling field (i.e. the one not removed) has become wrapped in
an array type.  I have spent a while stepping through the code and can't
quite understand why this is happening.  Somehow, the GetArrayStructFields
class is involved.

// read some nested JSON with structs/arrays

val json = """{
  "foo": "bar",
  "top": {
    "child1": 5,
    "child2": [{
      "child2First": "one",
      "child2Second": 2
    }]
  }
}""".stripMargin

val df = spark.read.option("multiLine", "true").json(Seq(json).toDS())

val resultDf = dropColumn(df, "top.child2.child2First")

resultDf.select("top.child2.child2Second")
/*
+------------+
|child2Second|
+------------+
|       [[2]]|
+------------+
*/

// check the same from the original DataFrame

df.select("top.child2.child2Second")
/*
+------------+
|child2Second|
+------------+
|         [2]|
+------------+
*/

// check the field type for "child2Second"
resultDf.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(0).dataType.typeName
// prints array

// check the same from the original DataFrame (when it was index 1)
df.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(1).dataType.typeName
// prints long

Is the code above incorrect, with regards to dropping nested fields (in
this case, a field within a struct, which itself is in an array)?  Or is
there some other consideration I'm missing?  Any insight is appreciated.