You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2017/10/02 15:28:16 UTC

[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17819#discussion_r142172037
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -2102,6 +2102,55 @@ class Dataset[T] private[sql](
       }
     
       /**
    +   * Returns a new Dataset by adding columns or replacing the existing columns that has
    +   * the same names.
    +   */
    +  private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = {
    +    require(colNames.size == cols.size,
    +      s"The size of column names: ${colNames.size} isn't equal to " +
    +        s"the size of columns: ${cols.size}")
    +    require(colNames.distinct.size == colNames.size,
    +      s"It is disallowed to use duplicate column names: $colNames")
    +
    +    val resolver = sparkSession.sessionState.analyzer.resolver
    +    val output = queryExecution.analyzed.output
    +
    +    val columnMap = colNames.zip(cols).toMap
    +
    +    val replacedAndExistingColumns = output.map { field =>
    +      val dupColumn = columnMap.find { case (colName, col) =>
    +        resolver(field.name, colName)
    +      }
    +      if (dupColumn.isDefined) {
    +        val colName = dupColumn.get._1
    +        val col = dupColumn.get._2
    +        col.as(colName)
    +      } else {
    +        Column(field)
    +      }
    +    }
    +
    +    val newColumns = columnMap.filter { case (colName, col) =>
    +      !output.exists(f => resolver(f.name, colName))
    +    }.map { case (colName, col) => col.as(colName) }
    +
    +    select(replacedAndExistingColumns ++ newColumns : _*)
    +  }
    +
    +  /**
    +   * Returns a new Dataset by adding columns with metadata.
    +   */
    +  private[spark] def withColumns(
    --- End diff --
    
    I will to add a test for this later.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org