You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by heuermh <gi...@git.apache.org> on 2018/08/10 17:15:33 UTC
[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...
Github user heuermh commented on a diff in the pull request:
https://github.com/apache/spark/pull/14083#discussion_r209327944
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala ---
@@ -138,6 +140,88 @@ package object expressions {
def indexOf(exprId: ExprId): Int = {
Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
}
+
+ private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = {
+ m.mapValues(_.distinct).map(identity)
+ }
+
+ /** Map to use for direct case insensitive attribute lookups. */
+ @transient private lazy val direct: Map[String, Seq[Attribute]] = {
+ unique(attrs.groupBy(_.name.toLowerCase))
+ }
+
+ /** Map to use for qualified case insensitive attribute lookups. */
+ @transient private val qualified: Map[(String, String), Seq[Attribute]] = {
+ val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a =>
+ (a.qualifier.get.toLowerCase, a.name.toLowerCase)
+ }
+ unique(grouped)
+ }
+
+ /** Perform attribute resolution given a name and a resolver. */
+ def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = {
+ // Collect matching attributes given a name and a lookup.
+ def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = {
+ candidates.toSeq.flatMap(_.collect {
+ case a if resolver(a.name, name) => a.withName(name)
+ })
+ }
+
+ // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name,
+ // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of
+ // matched attributes and a list of parts that are to be resolved.
+ //
+ // For example, consider an example where "a" is the table name, "b" is the column name,
+ // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
+ // and the second element will be List("c").
+ val matches = nameParts match {
+ case qualifier +: name +: nestedFields =>
+ val key = (qualifier.toLowerCase, name.toLowerCase)
+ val attributes = collectMatches(name, qualified.get(key)).filter { a =>
+ resolver(qualifier, a.qualifier.get)
+ }
+ (attributes, nestedFields)
+ case all =>
+ (Nil, all)
+ }
+
+ // If none of attributes match `table.column` pattern, we try to resolve it as a column.
+ val (candidates, nestedFields) = matches match {
+ case (Seq(), _) =>
+ val name = nameParts.head
+ val attributes = collectMatches(name, direct.get(name.toLowerCase))
+ (attributes, nameParts.tail)
+ case _ => matches
+ }
+
+ def name = UnresolvedAttribute(nameParts).name
+ candidates match {
+ case Seq(a) if nestedFields.nonEmpty =>
+ // One match, but we also need to extract the requested nested field.
+ // The foldLeft adds ExtractValues for every remaining parts of the identifier,
+ // and aliased it with the last part of the name.
+ // For example, consider "a.b.c", where "a" is resolved to an existing attribute.
+ // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final
+ // expression as "c".
+ val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) =>
+ ExtractValue(e, Literal(name), resolver)
--- End diff --
Is there an issue for the follow up?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org