You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2017/10/02 15:28:16 UTC
[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/17819#discussion_r142172037
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2102,6 +2102,55 @@ class Dataset[T] private[sql](
}
/**
+ * Returns a new Dataset by adding columns or replacing the existing columns that has
+ * the same names.
+ */
+ private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = {
+ require(colNames.size == cols.size,
+ s"The size of column names: ${colNames.size} isn't equal to " +
+ s"the size of columns: ${cols.size}")
+ require(colNames.distinct.size == colNames.size,
+ s"It is disallowed to use duplicate column names: $colNames")
+
+ val resolver = sparkSession.sessionState.analyzer.resolver
+ val output = queryExecution.analyzed.output
+
+ val columnMap = colNames.zip(cols).toMap
+
+ val replacedAndExistingColumns = output.map { field =>
+ val dupColumn = columnMap.find { case (colName, col) =>
+ resolver(field.name, colName)
+ }
+ if (dupColumn.isDefined) {
+ val colName = dupColumn.get._1
+ val col = dupColumn.get._2
+ col.as(colName)
+ } else {
+ Column(field)
+ }
+ }
+
+ val newColumns = columnMap.filter { case (colName, col) =>
+ !output.exists(f => resolver(f.name, colName))
+ }.map { case (colName, col) => col.as(colName) }
+
+ select(replacedAndExistingColumns ++ newColumns : _*)
+ }
+
+ /**
+ * Returns a new Dataset by adding columns with metadata.
+ */
+ private[spark] def withColumns(
--- End diff --
I will to add a test for this later.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org