You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/05/13 21:47:50 UTC

spark git commit: [SPARK-7551][DataFrame] support backticks for DataFrame attribute resolution

Repository: spark
Updated Branches:
  refs/heads/master 7ff16e8ab -> 213a6f30f


[SPARK-7551][DataFrame] support backticks for DataFrame attribute resolution

Author: Wenchen Fan <cl...@outlook.com>

Closes #6074 from cloud-fan/7551 and squashes the following commits:

e6f579e [Wenchen Fan] allow space
2b86699 [Wenchen Fan] handle blank
e218d99 [Wenchen Fan] address comments
54c4209 [Wenchen Fan] fix 7551


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/213a6f30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/213a6f30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/213a6f30

Branch: refs/heads/master
Commit: 213a6f30fee4a1c416ea76b678c71877fd36ef18
Parents: 7ff16e8
Author: Wenchen Fan <cl...@outlook.com>
Authored: Wed May 13 12:47:48 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed May 13 12:47:48 2015 -0700

----------------------------------------------------------------------
 .../catalyst/plans/logical/LogicalPlan.scala    | 55 +++++++++++++++++++-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  4 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 27 ++++++++++
 3 files changed, 82 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/213a6f30/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index dbb12d5..dba6965 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -105,7 +105,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   }
 
   /**
-   * Optionally resolves the given string to a [[NamedExpression]] using the input from all child
+   * Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
    * nodes of this LogicalPlan. The attribute is expressed as
    * as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
    */
@@ -116,7 +116,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
     resolve(nameParts, children.flatMap(_.output), resolver, throwErrors)
 
   /**
-   * Optionally resolves the given string to a [[NamedExpression]] based on the output of this
+   * Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
    * LogicalPlan. The attribute is expressed as string in the following form:
    * `[scope].AttributeName.[nested].[fields]...`.
    */
@@ -127,6 +127,57 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
     resolve(nameParts, output, resolver, throwErrors)
 
   /**
+   * Given an attribute name, split it to name parts by dot, but
+   * don't split the name parts quoted by backticks, for example,
+   * `ab.cd`.`efg` should be split into two parts "ab.cd" and "efg".
+   */
+  def resolveQuoted(
+      name: String,
+      resolver: Resolver): Option[NamedExpression] = {
+    resolve(parseAttributeName(name), resolver, true)
+  }
+
+  /**
+   * Internal method, used to split attribute name by dot with backticks rule.
+   * Backticks must appear in pairs, and the quoted string must be a complete name part,
+   * which means `ab..c`e.f is not allowed.
+   * Escape character is not supported now, so we can't use backtick inside name part.
+   */
+  private def parseAttributeName(name: String): Seq[String] = {
+    val e = new AnalysisException(s"syntax error in attribute name: $name")
+    val nameParts = scala.collection.mutable.ArrayBuffer.empty[String]
+    val tmp = scala.collection.mutable.ArrayBuffer.empty[Char]
+    var inBacktick = false
+    var i = 0
+    while (i < name.length) {
+      val char = name(i)
+      if (inBacktick) {
+        if (char == '`') {
+          inBacktick = false
+          if (i + 1 < name.length && name(i + 1) != '.') throw e
+        } else {
+          tmp += char
+        }
+      } else {
+        if (char == '`') {
+          if (tmp.nonEmpty) throw e
+          inBacktick = true
+        } else if (char == '.') {
+          if (tmp.isEmpty) throw e
+          nameParts += tmp.mkString
+          tmp.clear()
+        } else {
+          tmp += char
+        }
+      }
+      i += 1
+    }
+    if (tmp.isEmpty || inBacktick) throw e
+    nameParts += tmp.mkString
+    nameParts.toSeq
+  }
+
+  /**
    * Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
    *
    * This assumes `name` has multiple parts, where the 1st part is a qualifier

http://git-wip-us.apache.org/repos/asf/spark/blob/213a6f30/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index c820a67..4fd5105 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -160,7 +160,7 @@ class DataFrame private[sql](
   }
 
   protected[sql] def resolve(colName: String): NamedExpression = {
-    queryExecution.analyzed.resolve(colName.split("\\."), sqlContext.analyzer.resolver).getOrElse {
+    queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
       throw new AnalysisException(
         s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
     }
@@ -168,7 +168,7 @@ class DataFrame private[sql](
 
   protected[sql] def numericColumns: Seq[Expression] = {
     schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
-      queryExecution.analyzed.resolve(n.name.split("\\."), sqlContext.analyzer.resolver).get
+      queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/213a6f30/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 52aa1f6..1d5f6b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -459,6 +459,33 @@ class DataFrameSuite extends QueryTest {
     assert(complexData.filter(complexData("m")(complexData("s")("value")) === 1).count() == 1)
   }
 
+  test("SPARK-7551: support backticks for DataFrame attribute resolution") {
+    val df = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+      """{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
+    checkAnswer(
+      df.select(df("`a.b`.c.`d..e`.`f`")),
+      Row(1)
+    )
+
+    val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+      """{"a  b": {"c": {"d  e": {"f": 1}}}}""" :: Nil))
+    checkAnswer(
+      df2.select(df2("`a  b`.c.d  e.f")),
+      Row(1)
+    )
+
+    def checkError(testFun: => Unit): Unit = {
+      val e = intercept[org.apache.spark.sql.AnalysisException] {
+        testFun
+      }
+      assert(e.getMessage.contains("syntax error in attribute name:"))
+    }
+    checkError(df("`abc.`c`"))
+    checkError(df("`abc`..d"))
+    checkError(df("`a`.b."))
+    checkError(df("`a.b`.c.`d"))
+  }
+
   test("SPARK-7324 dropDuplicates") {
     val testData = TestSQLContext.sparkContext.parallelize(
       (2, 1, 2) :: (1, 1, 1) ::


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org