You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/08 14:30:06 UTC

[GitHub] [spark] EnricoMi commented on a change in pull request #26969: [SPARK-30319][SQL] Add a stricter version of `as[T]`

EnricoMi commented on a change in pull request #26969:
URL: https://github.com/apache/spark/pull/26969#discussion_r484965678



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -500,6 +501,67 @@ class Dataset[T] private[sql](
     select(newCols : _*)
   }
 
+  /**
+   * Returns a new Dataset where each record has been mapped on to the specified type.
+   *
+   * This is similar to `as[U]`, which is lazy in terms of the schema. With `as[U]`, the result
+   * `Dataset[U]` still has the schema of `T`, not `U`. Hence, it may contain additional columns or
+   * the columns may be in different order. The schema of `U` manifests only after operations that
+   * utilize the encoder, e.g. `map` or `collect`, but not `save`.
+   *
+   * This method returns a `Dataset[U]` that is strictly derived from `U`. This works for any case
+   * class with standard encoder. This is done through projection of `T`'s columns onto `U`'s
+   * schema, if possible. Otherwise, it throws an `AnalysisException`. Columns are matched by name
+   * and type, where column names' case sensitivity is determined by `spark.sql.caseSensitive`.
+   *
+   * Where `as[U]` supports types with inner classes that have extra fields or different field
+   * order, this is not possible through projection and hence not supported by this method.
+   * An example for an inner type is `Inner`:
+   * {{{
+   *   case class Inner(a: Int)
+   *   case class Outer(i: Inner)
+   * }}}
+   * In that case you should use `map(identity)`, if this is really needed.
+   *
+   * As for `as[U]`, if the schema of the Dataset does not match the desired `U` type, you can use
+   * `select` along with `alias` or `as` to rearrange or rename as required.
+   *
+   * @group basic
+   * @since 3.1.0
+   */
+  def toDS[U : Encoder]: Dataset[U] = {
+    // column names case-sensitivity is configurable
+    def considerCase(field: StructField): StructField = SQLConf.get.caseSensitiveAnalysis match {
+        case true => field
+        case false => field.copy(name = field.name.toLowerCase(Locale.ROOT))
+      }
+
+    // we can project this dataset[T] to Dataset[U] when it provides all of the Encoder[U]'s columns
+    val encoder = implicitly[Encoder[U]]
+    val projectedColumns = encoder.schema.fields
+    val availableColumns = this.schema.fields.map(considerCase).map(col => col.name -> col).toMap
+    val columnsMissing = projectedColumns.map(considerCase).exists(proj =>
+      ! availableColumns.get(proj.name).exists(avail => proj.dataType.acceptsType(avail.dataType))
+    )
+    if (columnsMissing) {
+      // give precedence to `as[U]`s Exception if that one would fail either
+      this.as[U]
+
+      // helper to pretty-print columns in exception message
+      def toString(columns: Iterable[StructField]): String =
+        s"(${ columns.map(c => s"${c.name}: ${c.dataType}").mkString(", ") })"
+
+      throw new AnalysisException(
+        s"Dataset type not supported by toDS[T]: ${encoder.clsTag}, use map(identity)\n" +
+          "Columns\n" +
+          s"\t${toString(availableColumns.values)} cannot be projected to\n" +
+          s"\t${toString(projectedColumns)}"
+      )
+    }
+
+    this.select(projectedColumns.map(c => col(c.name)): _*).as[U]

Review comment:
       And if we define the `toDS[U]` operation as applying the schema of `U` only and not any other semantics of the encoder. The encoder has an output schema, and this is the output schema of `toDS[U]`. I think that is a sensible scope and a common use case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org