You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/01/29 04:10:37 UTC

[2/2] spark git commit: [SPARK-5445][SQL] Made DataFrame dsl usable in Java

[SPARK-5445][SQL] Made DataFrame dsl usable in Java

Also removed the literal implicit transformation since it is pretty scary for API design. Instead, created a new lit method for creating literals. This doesn't break anything from a compatibility perspective because Literal was added two days ago.

Author: Reynold Xin <rx...@databricks.com>

Closes #4241 from rxin/df-docupdate and squashes the following commits:

c0f4810 [Reynold Xin] Fix Python merge conflict.
094c7d7 [Reynold Xin] Minor style fix. Reset Python tests.
3c89f4a [Reynold Xin] Package.
dfe6962 [Reynold Xin] Updated Python aggregate.
5dd4265 [Reynold Xin] Made dsl Java callable.
14b3c27 [Reynold Xin] Fix literal expression for symbols.
68b31cb [Reynold Xin] Literal.
4cfeb78 [Reynold Xin] [SPARK-5097][SQL] Address DataFrame code review feedback.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b9760de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b9760de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b9760de

Branch: refs/heads/master
Commit: 5b9760de8dd2dab7cf9a4f5c65869e4ed296a938
Parents: 4ee79c7
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Jan 28 19:10:32 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 28 19:10:32 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/examples/sql/RDDRelation.scala |   3 +-
 .../scala/org/apache/spark/ml/Transformer.scala |   2 +-
 .../ml/classification/LogisticRegression.scala  |   2 +-
 .../spark/ml/feature/StandardScaler.scala       |   2 +-
 .../apache/spark/ml/recommendation/ALS.scala    |   2 +-
 python/pyspark/sql.py                           |  38 +-
 .../scala/org/apache/spark/sql/Column.scala     | 111 ++--
 .../scala/org/apache/spark/sql/DataFrame.scala  |  99 ++--
 .../scala/org/apache/spark/sql/Literal.scala    |  98 ----
 .../scala/org/apache/spark/sql/SQLContext.scala |  16 +-
 .../main/scala/org/apache/spark/sql/api.scala   |  11 +-
 .../org/apache/spark/sql/api/java/dsl.java      |  85 ++++
 .../spark/sql/api/scala/dsl/package.scala       | 502 +++++++++++++++++++
 .../org/apache/spark/sql/dsl/package.scala      | 496 ------------------
 .../org/apache/spark/sql/CachedTableSuite.scala |   2 +-
 .../spark/sql/ColumnExpressionSuite.scala       |  10 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  12 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  26 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |   2 +-
 .../scala/org/apache/spark/sql/TestData.scala   |   2 +-
 .../scala/org/apache/spark/sql/UDFSuite.scala   |   2 +-
 .../apache/spark/sql/UserDefinedTypeSuite.scala |   2 +-
 .../columnar/InMemoryColumnarQuerySuite.scala   |   2 +-
 .../spark/sql/execution/PlannerSuite.scala      |   2 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   |   9 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |   2 +-
 .../org/apache/spark/sql/hive/package-info.java |   2 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |   2 +-
 .../sql/hive/execution/HiveTableScanSuite.scala |   2 +-
 29 files changed, 786 insertions(+), 760 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index a5d7f26..e9f4788 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -19,8 +19,7 @@ package org.apache.spark.examples.sql
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.dsl._
-import org.apache.spark.sql.dsl.literals._
+import org.apache.spark.sql.api.scala.dsl._
 
 // One method for defining the schema of an RDD is to make a case class with the desired column
 // names and types.

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
index b233bff..29cd981 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
@@ -24,7 +24,7 @@ import org.apache.spark.annotation.AlphaComponent
 import org.apache.spark.ml.param._
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql._
-import org.apache.spark.sql.dsl._
+import org.apache.spark.sql.api.scala.dsl._
 import org.apache.spark.sql.types._
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index eeb6301..101f6c8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -24,7 +24,7 @@ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
 import org.apache.spark.mllib.linalg.{BLAS, Vector, VectorUDT}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.sql._
-import org.apache.spark.sql.dsl._
+import org.apache.spark.sql.api.scala.dsl._
 import org.apache.spark.sql.catalyst.dsl._
 import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
 import org.apache.spark.storage.StorageLevel

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index e7bdb07..c456beb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -23,7 +23,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.mllib.feature
 import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
 import org.apache.spark.sql._
-import org.apache.spark.sql.dsl._
+import org.apache.spark.sql.api.scala.dsl._
 import org.apache.spark.sql.catalyst.dsl._
 import org.apache.spark.sql.types.{StructField, StructType}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index f0bea5f..738b184 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -30,7 +30,7 @@ import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.param._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Column, DataFrame}
-import org.apache.spark.sql.dsl._
+import org.apache.spark.sql.api.scala.dsl._
 import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter}

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index c3a6938..fdd8034 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -931,7 +931,7 @@ def _parse_schema_abstract(s):
 
 def _infer_schema_type(obj, dataType):
     """
-    Fill the dataType with types infered from obj
+    Fill the dataType with types inferred from obj
 
     >>> schema = _parse_schema_abstract("a b c d")
     >>> row = (1, 1.0, "str", datetime.date(2014, 10, 10))
@@ -2140,7 +2140,7 @@ class DataFrame(object):
             return Column(self._jdf.apply(name))
         raise AttributeError
 
-    def As(self, name):
+    def alias(self, name):
         """ Alias the current DataFrame """
         return DataFrame(getattr(self._jdf, "as")(name), self.sql_ctx)
 
@@ -2216,7 +2216,7 @@ class DataFrame(object):
         """
         return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx)
 
-    def Except(self, other):
+    def subtract(self, other):
         """ Return a new [[DataFrame]] containing rows in this frame
         but not in another frame.
 
@@ -2234,7 +2234,7 @@ class DataFrame(object):
 
     def addColumn(self, colName, col):
         """ Return a new [[DataFrame]] by adding a column. """
-        return self.select('*', col.As(colName))
+        return self.select('*', col.alias(colName))
 
     def removeColumn(self, colName):
         raise NotImplemented
@@ -2342,7 +2342,7 @@ SCALA_METHOD_MAPPINGS = {
 
 def _create_column_from_literal(literal):
     sc = SparkContext._active_spark_context
-    return sc._jvm.Literal.apply(literal)
+    return sc._jvm.org.apache.spark.sql.api.java.dsl.lit(literal)
 
 
 def _create_column_from_name(name):
@@ -2371,13 +2371,20 @@ def _unary_op(name):
     return _
 
 
-def _bin_op(name):
-    """ Create a method for given binary operator """
+def _bin_op(name, pass_literal_through=False):
+    """ Create a method for given binary operator
+
+    Keyword arguments:
+    pass_literal_through -- whether to pass literal value directly through to the JVM.
+    """
     def _(self, other):
         if isinstance(other, Column):
             jc = other._jc
         else:
-            jc = _create_column_from_literal(other)
+            if pass_literal_through:
+                jc = other
+            else:
+                jc = _create_column_from_literal(other)
         return Column(getattr(self._jc, _scalaMethod(name))(jc), self._jdf, self.sql_ctx)
     return _
 
@@ -2458,10 +2465,10 @@ class Column(DataFrame):
     # __getattr__ = _bin_op("getField")
 
     # string methods
-    rlike = _bin_op("rlike")
-    like = _bin_op("like")
-    startswith = _bin_op("startsWith")
-    endswith = _bin_op("endsWith")
+    rlike = _bin_op("rlike", pass_literal_through=True)
+    like = _bin_op("like", pass_literal_through=True)
+    startswith = _bin_op("startsWith", pass_literal_through=True)
+    endswith = _bin_op("endsWith", pass_literal_through=True)
     upper = _unary_op("upper")
     lower = _unary_op("lower")
 
@@ -2487,7 +2494,7 @@ class Column(DataFrame):
     isNotNull = _unary_op("isNotNull")
 
     # `as` is keyword
-    def As(self, alias):
+    def alias(self, alias):
         return Column(getattr(self._jsc, "as")(alias), self._jdf, self.sql_ctx)
 
     def cast(self, dataType):
@@ -2501,15 +2508,14 @@ class Column(DataFrame):
 
 
 def _aggregate_func(name):
-    """ Creat a function for aggregator by name"""
+    """ Create a function for aggregator by name"""
     def _(col):
         sc = SparkContext._active_spark_context
         if isinstance(col, Column):
             jcol = col._jc
         else:
             jcol = _create_column_from_name(col)
-        # FIXME: can not access dsl.min/max ...
-        jc = getattr(sc._jvm.org.apache.spark.sql.dsl(), name)(jcol)
+        jc = getattr(sc._jvm.org.apache.spark.sql.api.java.dsl, name)(jcol)
         return Column(jc)
     return staticmethod(_)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 7f20cf8..7f9a91a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import scala.language.implicitConversions
 
+import org.apache.spark.sql.api.scala.dsl.lit
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, Star}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
@@ -55,11 +56,11 @@ class Column(
     val expr: Expression)
   extends DataFrame(sqlContext, plan) with ExpressionApi {
 
-  /** Turn a Catalyst expression into a `Column`. */
+  /** Turns a Catalyst expression into a `Column`. */
   protected[sql] def this(expr: Expression) = this(None, None, expr)
 
   /**
-   * Create a new `Column` expression based on a column or attribute name.
+   * Creates a new `Column` expression based on a column or attribute name.
    * The resolution of this is the same as SQL. For example:
    *
    * - "colName" becomes an expression selecting the column named "colName".
@@ -108,7 +109,7 @@ class Column(
   override def unary_~ : Column = BitwiseNot(expr)
 
   /**
-   * Invert a boolean expression, i.e. NOT.
+   * Inversion of boolean expression, i.e. NOT.
    * {{
    *   // Select rows that are not active (isActive === false)
    *   df.select( !df("isActive") )
@@ -135,7 +136,7 @@ class Column(
    *   df.select( df("colA".equalTo("Zaharia") )
    * }}}
    */
-  override def === (literal: Any): Column = this === Literal.anyToLiteral(literal)
+  override def === (literal: Any): Column = this === lit(literal)
 
   /**
    * Equality test with an expression.
@@ -175,7 +176,7 @@ class Column(
    *   df.select( !(df("colA") === 15) )
    * }}}
    */
-  override def !== (literal: Any): Column = this !== Literal.anyToLiteral(literal)
+  override def !== (literal: Any): Column = this !== lit(literal)
 
   /**
    * Greater than an expression.
@@ -193,7 +194,7 @@ class Column(
    *   people.select( people("age") > 21 )
    * }}}
    */
-  override def > (literal: Any): Column = this > Literal.anyToLiteral(literal)
+  override def > (literal: Any): Column = this > lit(literal)
 
   /**
    * Less than an expression.
@@ -211,7 +212,7 @@ class Column(
    *   people.select( people("age") < 21 )
    * }}}
    */
-  override def < (literal: Any): Column = this < Literal.anyToLiteral(literal)
+  override def < (literal: Any): Column = this < lit(literal)
 
   /**
    * Less than or equal to an expression.
@@ -229,7 +230,7 @@ class Column(
    *   people.select( people("age") <= 21 )
    * }}}
    */
-  override def <= (literal: Any): Column = this <= Literal.anyToLiteral(literal)
+  override def <= (literal: Any): Column = this <= lit(literal)
 
   /**
    * Greater than or equal to an expression.
@@ -247,20 +248,20 @@ class Column(
    *   people.select( people("age") >= 21 )
    * }}}
    */
-  override def >= (literal: Any): Column = this >= Literal.anyToLiteral(literal)
+  override def >= (literal: Any): Column = this >= lit(literal)
 
   /**
    * Equality test with an expression that is safe for null values.
    */
   override def <=> (other: Column): Column = other match {
-    case null => EqualNullSafe(expr, Literal.anyToLiteral(null).expr)
+    case null => EqualNullSafe(expr, lit(null).expr)
     case _ => EqualNullSafe(expr, other.expr)
   }
 
   /**
    * Equality test with a literal value that is safe for null values.
    */
-  override def <=> (literal: Any): Column = this <=> Literal.anyToLiteral(literal)
+  override def <=> (literal: Any): Column = this <=> lit(literal)
 
   /**
    * True if the current expression is null.
@@ -288,7 +289,7 @@ class Column(
    *   people.select( people("inSchool") || true )
    * }}}
    */
-  override def || (literal: Boolean): Column = this || Literal.anyToLiteral(literal)
+  override def || (literal: Boolean): Column = this || lit(literal)
 
   /**
    * Boolean AND with an expression.
@@ -306,7 +307,7 @@ class Column(
    *   people.select( people("inSchool") && true )
    * }}}
    */
-  override def && (literal: Boolean): Column = this && Literal.anyToLiteral(literal)
+  override def && (literal: Boolean): Column = this && lit(literal)
 
   /**
    * Bitwise AND with an expression.
@@ -316,7 +317,7 @@ class Column(
   /**
    * Bitwise AND with a literal value.
    */
-  override def & (literal: Any): Column = this & Literal.anyToLiteral(literal)
+  override def & (literal: Any): Column = this & lit(literal)
 
   /**
    * Bitwise OR with an expression.
@@ -326,7 +327,7 @@ class Column(
   /**
    * Bitwise OR with a literal value.
    */
-  override def | (literal: Any): Column = this | Literal.anyToLiteral(literal)
+  override def | (literal: Any): Column = this | lit(literal)
 
   /**
    * Bitwise XOR with an expression.
@@ -336,7 +337,7 @@ class Column(
   /**
    * Bitwise XOR with a literal value.
    */
-  override def ^ (literal: Any): Column = this ^ Literal.anyToLiteral(literal)
+  override def ^ (literal: Any): Column = this ^ lit(literal)
 
   /**
    * Sum of this expression and another expression.
@@ -354,10 +355,10 @@ class Column(
    *   people.select( people("height") + 10 )
    * }}}
    */
-  override def + (literal: Any): Column = this + Literal.anyToLiteral(literal)
+  override def + (literal: Any): Column = this + lit(literal)
 
   /**
-   * Subtraction. Substract the other expression from this expression.
+   * Subtraction. Subtract the other expression from this expression.
    * {{{
    *   // The following selects the difference between people's height and their weight.
    *   people.select( people("height") - people("weight") )
@@ -366,16 +367,16 @@ class Column(
   override def - (other: Column): Column = Subtract(expr, other.expr)
 
   /**
-   * Subtraction. Substract a literal value from this expression.
+   * Subtraction. Subtract a literal value from this expression.
    * {{{
-   *   // The following selects a person's height and substract it by 10.
+   *   // The following selects a person's height and subtract it by 10.
    *   people.select( people("height") - 10 )
    * }}}
    */
-  override def - (literal: Any): Column = this - Literal.anyToLiteral(literal)
+  override def - (literal: Any): Column = this - lit(literal)
 
   /**
-   * Multiply this expression and another expression.
+   * Multiplication of this expression and another expression.
    * {{{
    *   // The following multiplies a person's height by their weight.
    *   people.select( people("height") * people("weight") )
@@ -384,16 +385,16 @@ class Column(
   override def * (other: Column): Column = Multiply(expr, other.expr)
 
   /**
-   * Multiply this expression and a literal value.
+   * Multiplication this expression and a literal value.
    * {{{
    *   // The following multiplies a person's height by 10.
    *   people.select( people("height") * 10 )
    * }}}
    */
-  override def * (literal: Any): Column = this * Literal.anyToLiteral(literal)
+  override def * (literal: Any): Column = this * lit(literal)
 
   /**
-   * Divide this expression by another expression.
+   * Division this expression by another expression.
    * {{{
    *   // The following divides a person's height by their weight.
    *   people.select( people("height") / people("weight") )
@@ -402,13 +403,13 @@ class Column(
   override def / (other: Column): Column = Divide(expr, other.expr)
 
   /**
-   * Divide this expression by a literal value.
+   * Division this expression by a literal value.
    * {{{
    *   // The following divides a person's height by 10.
    *   people.select( people("height") / 10 )
    * }}}
    */
-  override def / (literal: Any): Column = this / Literal.anyToLiteral(literal)
+  override def / (literal: Any): Column = this / lit(literal)
 
   /**
    * Modulo (a.k.a. remainder) expression.
@@ -418,7 +419,7 @@ class Column(
   /**
    * Modulo (a.k.a. remainder) expression.
    */
-  override def % (literal: Any): Column = this % Literal.anyToLiteral(literal)
+  override def % (literal: Any): Column = this % lit(literal)
 
 
   /**
@@ -428,43 +429,67 @@ class Column(
   @scala.annotation.varargs
   override def in(list: Column*): Column = In(expr, list.map(_.expr))
 
-  override def like(other: Column): Column = Like(expr, other.expr)
-
-  override def like(literal: String): Column = this.like(Literal.anyToLiteral(literal))
-
-  override def rlike(other: Column): Column = RLike(expr, other.expr)
-
-  override def rlike(literal: String): Column = this.rlike(Literal.anyToLiteral(literal))
+  override def like(literal: String): Column = Like(expr, lit(literal).expr)
 
+  override def rlike(literal: String): Column = RLike(expr, lit(literal).expr)
 
+  /**
+   * An expression that gets an
+   * @param ordinal
+   * @return
+   */
   override def getItem(ordinal: Int): Column = GetItem(expr, LiteralExpr(ordinal))
 
-  override def getItem(ordinal: Column): Column = GetItem(expr, ordinal.expr)
-
+  /**
+   * An expression that gets a field by name in a [[StructField]].
+   */
   override def getField(fieldName: String): Column = GetField(expr, fieldName)
 
-
+  /**
+   * An expression that returns a substring.
+   * @param startPos expression for the starting position.
+   * @param len expression for the length of the substring.
+   */
   override def substr(startPos: Column, len: Column): Column =
     Substring(expr, startPos.expr, len.expr)
 
-  override def substr(startPos: Int, len: Int): Column =
-    this.substr(Literal.anyToLiteral(startPos), Literal.anyToLiteral(len))
+  /**
+   * An expression that returns a substring.
+   * @param startPos starting position.
+   * @param len length of the substring.
+   */
+  override def substr(startPos: Int, len: Int): Column = this.substr(lit(startPos), lit(len))
 
   override def contains(other: Column): Column = Contains(expr, other.expr)
 
-  override def contains(literal: Any): Column = this.contains(Literal.anyToLiteral(literal))
+  override def contains(literal: Any): Column = this.contains(lit(literal))
 
 
   override def startsWith(other: Column): Column = StartsWith(expr, other.expr)
 
-  override def startsWith(literal: String): Column = this.startsWith(Literal.anyToLiteral(literal))
+  override def startsWith(literal: String): Column = this.startsWith(lit(literal))
 
   override def endsWith(other: Column): Column = EndsWith(expr, other.expr)
 
-  override def endsWith(literal: String): Column = this.endsWith(Literal.anyToLiteral(literal))
+  override def endsWith(literal: String): Column = this.endsWith(lit(literal))
 
+  /**
+   * Gives the column an alias.
+   * {{{
+   *   // Renames colA to colB in select output.
+   *   df.select($"colA".as("colB"))
+   * }}}
+   */
   override def as(alias: String): Column = Alias(expr, alias)()
 
+  /**
+   * Casts the column to a different data type.
+   * {{{
+   *   // Casts colA to IntegerType.
+   *   import org.apache.spark.sql.types.IntegerType
+   *   df.select(df("colA").as(IntegerType))
+   * }}}
+   */
   override def cast(to: DataType): Column = Cast(expr, to)
 
   override def desc: Column = SortOrder(expr, Descending)

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index ff59cbf..ceb5f86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -17,24 +17,22 @@
 
 package org.apache.spark.sql
 
+import java.util.{List => JList}
+
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 import scala.collection.JavaConversions._
 
-import java.util.{ArrayList, List => JList}
-
 import com.fasterxml.jackson.core.JsonFactory
-import net.razorvine.pickle.Pickler
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
@@ -53,7 +51,8 @@ import org.apache.spark.util.Utils
  * }}}
  *
  * Once created, it can be manipulated using the various domain-specific-language (DSL) functions
- * defined in: [[DataFrame]] (this class), [[Column]], and [[dsl]] for Scala DSL.
+ * defined in: [[DataFrame]] (this class), [[Column]], [[api.scala.dsl]] for Scala DSL, and
+ * [[api.java.dsl]] for Java DSL.
  *
  * To select a column from the data frame, use the apply method:
  * {{{
@@ -110,14 +109,14 @@ class DataFrame protected[sql](
     new DataFrame(sqlContext, logicalPlan, true)
   }
 
-  /** Return the list of numeric columns, useful for doing aggregation. */
+  /** Returns the list of numeric columns, useful for doing aggregation. */
   protected[sql] def numericColumns: Seq[Expression] = {
     schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
       logicalPlan.resolve(n.name, sqlContext.analyzer.resolver).get
     }
   }
 
-  /** Resolve a column name into a Catalyst [[NamedExpression]]. */
+  /** Resolves a column name into a Catalyst [[NamedExpression]]. */
   protected[sql] def resolve(colName: String): NamedExpression = {
     logicalPlan.resolve(colName, sqlContext.analyzer.resolver).getOrElse(
       throw new RuntimeException(s"""Cannot resolve column name "$colName""""))
@@ -128,22 +127,22 @@ class DataFrame protected[sql](
   def toSchemaRDD: DataFrame = this
 
   /**
-   * Return the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
+   * Returns the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
    */
   def toDataFrame: DataFrame = this
 
-  /** Return the schema of this [[DataFrame]]. */
+  /** Returns the schema of this [[DataFrame]]. */
   override def schema: StructType = queryExecution.analyzed.schema
 
-  /** Return all column names and their data types as an array. */
+  /** Returns all column names and their data types as an array. */
   override def dtypes: Array[(String, String)] = schema.fields.map { field =>
     (field.name, field.dataType.toString)
   }
 
-  /** Return all column names as an array. */
+  /** Returns all column names as an array. */
   override def columns: Array[String] = schema.fields.map(_.name)
 
-  /** Print the schema to the console in a nice tree format. */
+  /** Prints the schema to the console in a nice tree format. */
   override def printSchema(): Unit = println(schema.treeString)
 
   /**
@@ -187,7 +186,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Return a new [[DataFrame]] sorted by the specified column, in ascending column.
+   * Returns a new [[DataFrame]] sorted by the specified column, in ascending column.
    * {{{
    *   // The following 3 are equivalent
    *   df.sort("sortcol")
@@ -200,7 +199,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Return a new [[DataFrame]] sorted by the given expressions. For example:
+   * Returns a new [[DataFrame]] sorted by the given expressions. For example:
    * {{{
    *   df.sort($"col1", $"col2".desc)
    * }}}
@@ -219,7 +218,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Return a new [[DataFrame]] sorted by the given expressions.
+   * Returns a new [[DataFrame]] sorted by the given expressions.
    * This is an alias of the `sort` function.
    */
   @scala.annotation.varargs
@@ -228,7 +227,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Selecting a single column and return it as a [[Column]].
+   * Selects a single column and return it as a [[Column]].
    */
   override def apply(colName: String): Column = colName match {
     case "*" =>
@@ -239,7 +238,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Selecting a set of expressions, wrapped in a Product.
+   * Selects a set of expressions, wrapped in a Product.
    * {{{
    *   // The following two are equivalent:
    *   df.apply(($"colA", $"colB" + 1))
@@ -250,17 +249,17 @@ class DataFrame protected[sql](
     require(projection.productArity >= 1)
     select(projection.productIterator.map {
       case c: Column => c
-      case o: Any => new Column(Some(sqlContext), None, LiteralExpr(o))
+      case o: Any => new Column(Some(sqlContext), None, Literal(o))
     }.toSeq :_*)
   }
 
   /**
-   * Alias the current [[DataFrame]].
+   * Returns a new [[DataFrame]] with an alias set.
    */
   override def as(name: String): DataFrame = Subquery(name, logicalPlan)
 
   /**
-   * Selecting a set of expressions.
+   * Selects a set of expressions.
    * {{{
    *   df.select($"colA", $"colB" + 1)
    * }}}
@@ -277,7 +276,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Selecting a set of columns. This is a variant of `select` that can only select
+   * Selects a set of columns. This is a variant of `select` that can only select
    * existing columns using column names (i.e. cannot construct expressions).
    *
    * {{{
@@ -292,7 +291,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Filtering rows using the given condition.
+   * Filters rows using the given condition.
    * {{{
    *   // The following are equivalent:
    *   peopleDf.filter($"age" > 15)
@@ -305,7 +304,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Filtering rows using the given condition. This is an alias for `filter`.
+   * Filters rows using the given condition. This is an alias for `filter`.
    * {{{
    *   // The following are equivalent:
    *   peopleDf.filter($"age" > 15)
@@ -316,7 +315,7 @@ class DataFrame protected[sql](
   override def where(condition: Column): DataFrame = filter(condition)
 
   /**
-   * Filtering rows using the given condition. This is a shorthand meant for Scala.
+   * Filters rows using the given condition. This is a shorthand meant for Scala.
    * {{{
    *   // The following are equivalent:
    *   peopleDf.filter($"age" > 15)
@@ -327,7 +326,7 @@ class DataFrame protected[sql](
   override def apply(condition: Column): DataFrame = filter(condition)
 
   /**
-   * Group the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+   * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
    * See [[GroupedDataFrame]] for all the available aggregate functions.
    *
    * {{{
@@ -347,7 +346,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Group the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+   * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
    * See [[GroupedDataFrame]] for all the available aggregate functions.
    *
    * This is a variant of groupBy that can only group by existing columns using column names
@@ -371,7 +370,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Aggregate on the entire [[DataFrame]] without groups.
+   * Aggregates on the entire [[DataFrame]] without groups.
    * {{
    *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
    *   df.agg(Map("age" -> "max", "salary" -> "avg"))
@@ -381,7 +380,7 @@ class DataFrame protected[sql](
   override def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
 
   /**
-   * Aggregate on the entire [[DataFrame]] without groups.
+   * Aggregates on the entire [[DataFrame]] without groups.
    * {{
    *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
    *   df.agg(max($"age"), avg($"salary"))
@@ -392,31 +391,31 @@ class DataFrame protected[sql](
   override def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs :_*)
 
   /**
-   * Return a new [[DataFrame]] by taking the first `n` rows. The difference between this function
+   * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
    * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
    */
-  override def limit(n: Int): DataFrame = Limit(LiteralExpr(n), logicalPlan)
+  override def limit(n: Int): DataFrame = Limit(Literal(n), logicalPlan)
 
   /**
-   * Return a new [[DataFrame]] containing union of rows in this frame and another frame.
+   * Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
    * This is equivalent to `UNION ALL` in SQL.
    */
   override def unionAll(other: DataFrame): DataFrame = Union(logicalPlan, other.logicalPlan)
 
   /**
-   * Return a new [[DataFrame]] containing rows only in both this frame and another frame.
+   * Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
    * This is equivalent to `INTERSECT` in SQL.
    */
   override def intersect(other: DataFrame): DataFrame = Intersect(logicalPlan, other.logicalPlan)
 
   /**
-   * Return a new [[DataFrame]] containing rows in this frame but not in another frame.
+   * Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
    * This is equivalent to `EXCEPT` in SQL.
    */
   override def except(other: DataFrame): DataFrame = Except(logicalPlan, other.logicalPlan)
 
   /**
-   * Return a new [[DataFrame]] by sampling a fraction of rows.
+   * Returns a new [[DataFrame]] by sampling a fraction of rows.
    *
    * @param withReplacement Sample with replacement or not.
    * @param fraction Fraction of rows to generate.
@@ -427,7 +426,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Return a new [[DataFrame]] by sampling a fraction of rows, using a random seed.
+   * Returns a new [[DataFrame]] by sampling a fraction of rows, using a random seed.
    *
    * @param withReplacement Sample with replacement or not.
    * @param fraction Fraction of rows to generate.
@@ -439,57 +438,63 @@ class DataFrame protected[sql](
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Return a new [[DataFrame]] by adding a column.
+   * Returns a new [[DataFrame]] by adding a column.
    */
   override def addColumn(colName: String, col: Column): DataFrame = {
     select(Column("*"), col.as(colName))
   }
 
   /**
-   * Return the first `n` rows.
+   * Returns the first `n` rows.
    */
   override def head(n: Int): Array[Row] = limit(n).collect()
 
   /**
-   * Return the first row.
+   * Returns the first row.
    */
   override def head(): Row = head(1).head
 
   /**
-   * Return the first row. Alias for head().
+   * Returns the first row. Alias for head().
    */
   override def first(): Row = head()
 
+  /**
+   * Returns a new RDD by applying a function to all rows of this DataFrame.
+   */
   override def map[R: ClassTag](f: Row => R): RDD[R] = {
     rdd.map(f)
   }
 
+  /**
+   * Returns a new RDD by applying a function to each partition of this DataFrame.
+   */
   override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
     rdd.mapPartitions(f)
   }
 
   /**
-   * Return the first `n` rows in the [[DataFrame]].
+   * Returns the first `n` rows in the [[DataFrame]].
    */
   override def take(n: Int): Array[Row] = head(n)
 
   /**
-   * Return an array that contains all of [[Row]]s in this [[DataFrame]].
+   * Returns an array that contains all of [[Row]]s in this [[DataFrame]].
    */
   override def collect(): Array[Row] = rdd.collect()
 
   /**
-   * Return a Java list that contains all of [[Row]]s in this [[DataFrame]].
+   * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
    */
   override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() :_*)
 
   /**
-   * Return the number of rows in the [[DataFrame]].
+   * Returns the number of rows in the [[DataFrame]].
    */
   override def count(): Long = groupBy().count().rdd.collect().head.getLong(0)
 
   /**
-   * Return a new [[DataFrame]] that has exactly `numPartitions` partitions.
+   * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
    */
   override def repartition(numPartitions: Int): DataFrame = {
     sqlContext.applySchema(rdd.repartition(numPartitions), schema)
@@ -546,7 +551,7 @@ class DataFrame protected[sql](
    * Creates a table from the the contents of this DataFrame.  This will fail if the table already
    * exists.
    *
-   * Note that this currently only works with DataFrame that are created from a HiveContext as
+   * Note that this currently only works with DataFrames that are created from a HiveContext as
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
    * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
    * be the target of an `insertInto`.
@@ -568,7 +573,7 @@ class DataFrame protected[sql](
   }
 
   /**
-   * Return the content of the [[DataFrame]] as a RDD of JSON strings.
+   * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
    */
   override def toJSON: RDD[String] = {
     val rowSchema = this.schema

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala b/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala
deleted file mode 100644
index 08cd4d0..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
-import org.apache.spark.sql.types._
-
-object Literal {
-
-  /** Return a new boolean literal. */
-  def apply(literal: Boolean): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new byte literal. */
-  def apply(literal: Byte): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new short literal. */
-  def apply(literal: Short): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new int literal. */
-  def apply(literal: Int): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new long literal. */
-  def apply(literal: Long): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new float literal. */
-  def apply(literal: Float): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new double literal. */
-  def apply(literal: Double): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new string literal. */
-  def apply(literal: String): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new decimal literal. */
-  def apply(literal: BigDecimal): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new decimal literal. */
-  def apply(literal: java.math.BigDecimal): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new timestamp literal. */
-  def apply(literal: java.sql.Timestamp): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new date literal. */
-  def apply(literal: java.sql.Date): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new binary (byte array) literal. */
-  def apply(literal: Array[Byte]): Column = new Column(LiteralExpr(literal))
-
-  /** Return a new null literal. */
-  def apply(literal: Null): Column = new Column(LiteralExpr(null))
-
-  /**
-   * Return a Column expression representing the literal value. Throws an exception if the
-   * data type is not supported by SparkSQL.
-   */
-  protected[sql] def anyToLiteral(literal: Any): Column = {
-    // If the literal is a symbol, convert it into a Column.
-    if (literal.isInstanceOf[Symbol]) {
-      return dsl.symbolToColumn(literal.asInstanceOf[Symbol])
-    }
-
-    val literalExpr = literal match {
-      case v: Int => LiteralExpr(v, IntegerType)
-      case v: Long => LiteralExpr(v, LongType)
-      case v: Double => LiteralExpr(v, DoubleType)
-      case v: Float => LiteralExpr(v, FloatType)
-      case v: Byte => LiteralExpr(v, ByteType)
-      case v: Short => LiteralExpr(v, ShortType)
-      case v: String => LiteralExpr(v, StringType)
-      case v: Boolean => LiteralExpr(v, BooleanType)
-      case v: BigDecimal => LiteralExpr(Decimal(v), DecimalType.Unlimited)
-      case v: java.math.BigDecimal => LiteralExpr(Decimal(v), DecimalType.Unlimited)
-      case v: Decimal => LiteralExpr(v, DecimalType.Unlimited)
-      case v: java.sql.Timestamp => LiteralExpr(v, TimestampType)
-      case v: java.sql.Date => LiteralExpr(v, DateType)
-      case v: Array[Byte] => LiteralExpr(v, BinaryType)
-      case null => LiteralExpr(null, NullType)
-      case _ =>
-        throw new RuntimeException("Unsupported literal type " + literal.getClass + " " + literal)
-    }
-    new Column(literalExpr)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d56d405..f87fde4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -135,19 +135,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * The following example registers a UDF in Java:
    * {{{
    *   sqlContext.udf().register("myUDF",
-   *     new UDF2<Integer, String, String>() {
-   *       @Override
-   *       public String call(Integer arg1, String arg2) {
-   *         return arg2 + arg1;
-   *       }
-   *     }, DataTypes.StringType);
+   *       new UDF2<Integer, String, String>() {
+   *           @Override
+   *           public String call(Integer arg1, String arg2) {
+   *               return arg2 + arg1;
+   *           }
+   *      }, DataTypes.StringType);
    * }}}
    *
    * Or, to use Java 8 lambda syntax:
    * {{{
    *   sqlContext.udf().register("myUDF",
-   *     (Integer arg1, String arg2) -> arg2 + arg1),
-   *     DataTypes.StringType);
+   *       (Integer arg1, String arg2) -> arg2 + arg1),
+   *       DataTypes.StringType);
    * }}}
    */
   val udf: UDFRegistration = new UDFRegistration(this)

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/sql/core/src/main/scala/org/apache/spark/sql/api.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
index 073d41e..5eeaf17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
@@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
  * An internal interface defining the RDD-like methods for [[DataFrame]].
  * Please use [[DataFrame]] directly, and do NOT use this.
  */
-trait RDDApi[T] {
+private[sql] trait RDDApi[T] {
 
   def cache(): this.type = persist()
 
@@ -64,7 +64,7 @@ trait RDDApi[T] {
  * An internal interface defining data frame related methods in [[DataFrame]].
  * Please use [[DataFrame]] directly, and do NOT use this.
  */
-trait DataFrameSpecificApi {
+private[sql] trait DataFrameSpecificApi {
 
   def schema: StructType
 
@@ -181,7 +181,7 @@ trait DataFrameSpecificApi {
  * An internal interface defining expression APIs for [[DataFrame]].
  * Please use [[DataFrame]] and [[Column]] directly, and do NOT use this.
  */
-trait ExpressionApi {
+private[sql] trait ExpressionApi {
 
   def isComputable: Boolean
 
@@ -231,9 +231,7 @@ trait ExpressionApi {
   @scala.annotation.varargs
   def in(list: Column*): Column
 
-  def like(other: Column): Column
   def like(other: String): Column
-  def rlike(other: Column): Column
   def rlike(other: String): Column
 
   def contains(other: Column): Column
@@ -249,7 +247,6 @@ trait ExpressionApi {
   def isNull: Column
   def isNotNull: Column
 
-  def getItem(ordinal: Column): Column
   def getItem(ordinal: Int): Column
   def getField(fieldName: String): Column
 
@@ -266,7 +263,7 @@ trait ExpressionApi {
  * An internal interface defining aggregation APIs for [[DataFrame]].
  * Please use [[DataFrame]] and [[GroupedDataFrame]] directly, and do NOT use this.
  */
-trait GroupedDataFrameApi {
+private[sql] trait GroupedDataFrameApi {
 
   def agg(exprs: Map[String, String]): DataFrame
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/sql/core/src/main/scala/org/apache/spark/sql/api/java/dsl.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/dsl.java b/sql/core/src/main/scala/org/apache/spark/sql/api/java/dsl.java
new file mode 100644
index 0000000..74d7649
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/dsl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.api.scala.dsl.package$;
+
+
+/**
+ * Java version of the domain-specific functions available for {@link DataFrame}.
+ *
+ * The Scala version is at {@link org.apache.spark.sql.api.scala.dsl}.
+ */
+public class dsl {
+  // NOTE: Update also the Scala version when we update this version.
+
+  private static package$ scalaDsl = package$.MODULE$;
+
+  /**
+   * Creates a column of literal value.
+   */
+  public static Column lit(Object literalValue) {
+    return scalaDsl.lit(literalValue);
+  }
+
+  public static Column sum(Column e) {
+    return scalaDsl.sum(e);
+  }
+
+  public static Column sumDistinct(Column e) {
+    return scalaDsl.sumDistinct(e);
+  }
+
+  public static Column avg(Column e) {
+    return scalaDsl.avg(e);
+  }
+
+  public static Column first(Column e) {
+    return scalaDsl.first(e);
+  }
+
+  public static Column last(Column e) {
+    return scalaDsl.last(e);
+  }
+
+  public static Column min(Column e) {
+    return scalaDsl.min(e);
+  }
+
+  public static Column max(Column e) {
+    return scalaDsl.max(e);
+  }
+
+  public static Column upper(Column e) {
+    return scalaDsl.upper(e);
+  }
+
+  public static Column lower(Column e) {
+    return scalaDsl.lower(e);
+  }
+
+  public static Column sqrt(Column e) {
+    return scalaDsl.sqrt(e);
+  }
+
+  public static Column abs(Column e) {
+    return scalaDsl.abs(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9760de/sql/core/src/main/scala/org/apache/spark/sql/api/scala/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/scala/dsl/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/scala/dsl/package.scala
new file mode 100644
index 0000000..9f2d142
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/scala/dsl/package.scala
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.scala
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+
+/**
+ * Scala version of the domain specific functions available for [[DataFrame]].
+ *
+ * The Java-version is at [[api.java.dsl]].
+ */
+package object dsl {
+  // NOTE: Update also the Java version when we update this version.
+
+  /** An implicit conversion that turns a Scala `Symbol` into a [[Column]]. */
+  implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)
+
+  /** Converts $"col name" into an [[Column]]. */
+  implicit class StringToColumn(val sc: StringContext) extends AnyVal {
+    def $(args: Any*): ColumnName = {
+      new ColumnName(sc.s(args :_*))
+    }
+  }
+
+  private[this] implicit def toColumn(expr: Expression): Column = new Column(expr)
+
+  /**
+   * Creates a [[Column]] of literal value.
+   */
+  def lit(literal: Any): Column = {
+    if (literal.isInstanceOf[Symbol]) {
+      return new ColumnName(literal.asInstanceOf[Symbol].name)
+    }
+
+    val literalExpr = literal match {
+      case v: Boolean => Literal(v, BooleanType)
+      case v: Byte => Literal(v, ByteType)
+      case v: Short => Literal(v, ShortType)
+      case v: Int => Literal(v, IntegerType)
+      case v: Long => Literal(v, LongType)
+      case v: Float => Literal(v, FloatType)
+      case v: Double => Literal(v, DoubleType)
+      case v: String => Literal(v, StringType)
+      case v: BigDecimal => Literal(Decimal(v), DecimalType.Unlimited)
+      case v: java.math.BigDecimal => Literal(Decimal(v), DecimalType.Unlimited)
+      case v: Decimal => Literal(v, DecimalType.Unlimited)
+      case v: java.sql.Timestamp => Literal(v, TimestampType)
+      case v: java.sql.Date => Literal(v, DateType)
+      case v: Array[Byte] => Literal(v, BinaryType)
+      case null => Literal(null, NullType)
+      case _ =>
+        throw new RuntimeException("Unsupported literal type " + literal.getClass + " " + literal)
+    }
+    new Column(literalExpr)
+  }
+
+  def sum(e: Column): Column = Sum(e.expr)
+  def sumDistinct(e: Column): Column = SumDistinct(e.expr)
+  def count(e: Column): Column = Count(e.expr)
+
+  def countDistinct(expr: Column, exprs: Column*): Column =
+    CountDistinct((expr +: exprs).map(_.expr))
+
+  def avg(e: Column): Column = Average(e.expr)
+  def first(e: Column): Column = First(e.expr)
+  def last(e: Column): Column = Last(e.expr)
+  def min(e: Column): Column = Min(e.expr)
+  def max(e: Column): Column = Max(e.expr)
+
+  def upper(e: Column): Column = Upper(e.expr)
+  def lower(e: Column): Column = Lower(e.expr)
+  def sqrt(e: Column): Column = Sqrt(e.expr)
+  def abs(e: Column): Column = Abs(e.expr)
+
+
+  // scalastyle:off
+
+  /* Use the following code to generate:
+  (0 to 22).map { x =>
+    val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
+    val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
+    val args = (1 to x).map(i => s"arg$i: Column").mkString(", ")
+    val argsInUdf = (1 to x).map(i => s"arg$i.expr").mkString(", ")
+    println(s"""
+    /**
+     * Call a Scala function of ${x} arguments as user-defined function (UDF), and automatically
+     * infer the data types based on the function's signature.
+     */
+    def callUDF[$typeTags](f: Function$x[$types]${if (args.length > 0) ", " + args else ""}): Column = {
+      ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq($argsInUdf))
+    }""")
+  }
+
+  (0 to 22).map { x =>
+    val args = (1 to x).map(i => s"arg$i: Column").mkString(", ")
+    val fTypes = Seq.fill(x + 1)("_").mkString(", ")
+    val argsInUdf = (1 to x).map(i => s"arg$i.expr").mkString(", ")
+    println(s"""
+    /**
+     * Call a Scala function of ${x} arguments as user-defined function (UDF). This requires
+     * you to specify the return data type.
+     */
+    def callUDF(f: Function$x[$fTypes], returnType: DataType${if (args.length > 0) ", " + args else ""}): Column = {
+      ScalaUdf(f, returnType, Seq($argsInUdf))
+    }""")
+  }
+  }
+  */
+  /**
+   * Call a Scala function of 0 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag](f: Function0[RT]): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq())
+  }
+
+  /**
+   * Call a Scala function of 1 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT], arg1: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr))
+  }
+
+  /**
+   * Call a Scala function of 2 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT], arg1: Column, arg2: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr))
+  }
+
+  /**
+   * Call a Scala function of 3 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT], arg1: Column, arg2: Column, arg3: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr))
+  }
+
+  /**
+   * Call a Scala function of 4 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr))
+  }
+
+  /**
+   * Call a Scala function of 5 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr))
+  }
+
+  /**
+   * Call a Scala function of 6 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr))
+  }
+
+  /**
+   * Call a Scala function of 7 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr))
+  }
+
+  /**
+   * Call a Scala function of 8 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr))
+  }
+
+  /**
+   * Call a Scala function of 9 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr))
+  }
+
+  /**
+   * Call a Scala function of 10 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr))
+  }
+
+  /**
+   * Call a Scala function of 11 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](f: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr))
+  }
+
+  /**
+   * Call a Scala function of 12 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](f: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr))
+  }
+
+  /**
+   * Call a Scala function of 13 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](f: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr))
+  }
+
+  /**
+   * Call a Scala function of 14 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](f: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr))
+  }
+
+  /**
+   * Call a Scala function of 15 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](f: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr))
+  }
+
+  /**
+   * Call a Scala function of 16 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](f: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr))
+  }
+
+  /**
+   * Call a Scala function of 17 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](f: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr))
+  }
+
+  /**
+   * Call a Scala function of 18 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](f: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr))
+  }
+
+  /**
+   * Call a Scala function of 19 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](f: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr))
+  }
+
+  /**
+   * Call a Scala function of 20 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](f: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr))
+  }
+
+  /**
+   * Call a Scala function of 21 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](f: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column, arg21: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr, arg21.expr))
+  }
+
+  /**
+   * Call a Scala function of 22 arguments as user-defined function (UDF), and automatically
+   * infer the data types based on the function's signature.
+   */
+  def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](f: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column, arg21: Column, arg22: Column): Column = {
+    ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr, arg21.expr, arg22.expr))
+  }
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Call a Scala function of 0 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function0[_], returnType: DataType): Column = {
+    ScalaUdf(f, returnType, Seq())
+  }
+
+  /**
+   * Call a Scala function of 1 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function1[_, _], returnType: DataType, arg1: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr))
+  }
+
+  /**
+   * Call a Scala function of 2 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function2[_, _, _], returnType: DataType, arg1: Column, arg2: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr))
+  }
+
+  /**
+   * Call a Scala function of 3 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function3[_, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr))
+  }
+
+  /**
+   * Call a Scala function of 4 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function4[_, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr))
+  }
+
+  /**
+   * Call a Scala function of 5 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function5[_, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr))
+  }
+
+  /**
+   * Call a Scala function of 6 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function6[_, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr))
+  }
+
+  /**
+   * Call a Scala function of 7 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function7[_, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr))
+  }
+
+  /**
+   * Call a Scala function of 8 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function8[_, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr))
+  }
+
+  /**
+   * Call a Scala function of 9 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function9[_, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr))
+  }
+
+  /**
+   * Call a Scala function of 10 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr))
+  }
+
+  /**
+   * Call a Scala function of 11 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function11[_, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr))
+  }
+
+  /**
+   * Call a Scala function of 12 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function12[_, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr))
+  }
+
+  /**
+   * Call a Scala function of 13 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr))
+  }
+
+  /**
+   * Call a Scala function of 14 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr))
+  }
+
+  /**
+   * Call a Scala function of 15 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr))
+  }
+
+  /**
+   * Call a Scala function of 16 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr))
+  }
+
+  /**
+   * Call a Scala function of 17 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr))
+  }
+
+  /**
+   * Call a Scala function of 18 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr))
+  }
+
+  /**
+   * Call a Scala function of 19 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr))
+  }
+
+  /**
+   * Call a Scala function of 20 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr))
+  }
+
+  /**
+   * Call a Scala function of 21 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column, arg21: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr, arg21.expr))
+  }
+
+  /**
+   * Call a Scala function of 22 arguments as user-defined function (UDF). This requires
+   * you to specify the return data type.
+   */
+  def callUDF(f: Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column, arg21: Column, arg22: Column): Column = {
+    ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr, arg21.expr, arg22.expr))
+  }
+
+  // scalastyle:on
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org