You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/17 19:21:29 UTC

[2/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

[SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

Author: Michael Armbrust <mi...@databricks.com>

Closes #4642 from marmbrus/docs and squashes the following commits:

d291c34 [Michael Armbrust] python tests
9be66e3 [Michael Armbrust] comments
d56afc2 [Michael Armbrust] fix style
f004747 [Michael Armbrust] fix build
c4a907b [Michael Armbrust] fix tests
42e2b73 [Michael Armbrust] [SQL] Documentation / API Clean-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c74b07fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c74b07fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c74b07fa

Branch: refs/heads/master
Commit: c74b07fa94a8da50437d952ae05cf6ac70fbb93e
Parents: c76da36
Author: Michael Armbrust <mi...@databricks.com>
Authored: Tue Feb 17 10:21:17 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Feb 17 10:21:17 2015 -0800

----------------------------------------------------------------------
 project/SparkBuild.scala                        |  12 +-
 python/pyspark/sql/context.py                   |  28 +--
 .../org/apache/spark/sql/jdbc/JDBCUtils.java    |  59 ------
 .../scala/org/apache/spark/sql/DataFrame.scala  | 153 +++++++++++++-
 .../org/apache/spark/sql/DataFrameImpl.scala    |  33 ++-
 .../apache/spark/sql/ExperimentalMethods.scala  |   5 +
 .../apache/spark/sql/IncomputableColumn.scala   |   4 +
 .../scala/org/apache/spark/sql/SQLContext.scala | 200 +++++++++++++++----
 .../apache/spark/sql/UserDefinedFunction.scala  |   3 +-
 .../org/apache/spark/sql/api/package.scala      |  23 +++
 .../apache/spark/sql/execution/commands.scala   |   2 +-
 .../spark/sql/execution/debug/package.scala     |  10 +-
 .../spark/sql/jdbc/JavaJDBCTrampoline.scala     |  30 ---
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  |  74 ++-----
 .../sql/parquet/ParquetTableOperations.scala    |   4 +-
 .../apache/spark/sql/parquet/ParquetTest.scala  |   4 +-
 .../apache/spark/sql/parquet/newParquet.scala   |   6 +-
 .../spark/sql/parquet/timestamp/NanoTime.scala  |   2 +-
 .../org/apache/spark/sql/sources/ddl.scala      |   4 +-
 .../org/apache/spark/sql/jdbc/JavaJDBCTest.java | 102 ----------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |   7 +-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  20 +-
 .../spark/sql/jdbc/MySQLIntegration.scala       |  14 +-
 .../spark/sql/jdbc/PostgresIntegration.scala    |   4 +-
 .../execution/DescribeHiveTableCommand.scala    |   4 +-
 .../spark/sql/hive/execution/commands.scala     |   8 +
 .../spark/sql/hive/execution/package.scala      |  25 +++
 .../org/apache/spark/sql/hive/package.scala     |  10 +
 .../sql/hive/parquet/FakeParquetSerDe.scala     |  56 ------
 .../org/apache/spark/sql/hive/Shim12.scala      |   9 +-
 .../org/apache/spark/sql/hive/Shim13.scala      |   9 +-
 31 files changed, 501 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 8fb1239..e4b1b96 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -361,9 +361,16 @@ object Unidoc {
     publish := {},
 
     unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn),
     unidocProjectFilter in(JavaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn),
+      inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn),
+
+    // Skip actual catalyst, but include the subproject.
+    // Catalyst is not public API and contains quasiquotes which break scaladoc.
+    unidocAllSources in (ScalaUnidoc, unidoc) := {
+      (unidocAllSources in (ScalaUnidoc, unidoc)).value
+        .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
+    },
 
     // Skip class names containing $ and some internal packages in Javadocs
     unidocAllSources in (JavaUnidoc, unidoc) := {
@@ -376,6 +383,7 @@ object Unidoc {
         .map(_.filterNot(_.getCanonicalPath.contains("executor")))
         .map(_.filterNot(_.getCanonicalPath.contains("python")))
         .map(_.filterNot(_.getCanonicalPath.contains("collection")))
+        .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
     },
 
     // Javadoc options: create a window title, and group key packages on index page

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index dd2cd5e..2e2309f 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -252,7 +252,7 @@ class SQLContext(object):
         >>> schema = StructType([StructField("field1", IntegerType(), False),
         ...     StructField("field2", StringType(), False)])
         >>> df = sqlCtx.applySchema(rdd2, schema)
-        >>> sqlCtx.registerRDDAsTable(df, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlCtx.sql("SELECT * from table1")
         >>> df2.collect()
         [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
@@ -405,17 +405,17 @@ class SQLContext(object):
 
         return self.applySchema(data, schema)
 
-    def registerRDDAsTable(self, rdd, tableName):
+    def registerDataFrameAsTable(self, rdd, tableName):
         """Registers the given RDD as a temporary table in the catalog.
 
         Temporary tables exist only during the lifetime of this instance of
         SQLContext.
 
-        >>> sqlCtx.registerRDDAsTable(df, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df, "table1")
         """
         if (rdd.__class__ is DataFrame):
             df = rdd._jdf
-            self._ssql_ctx.registerRDDAsTable(df, tableName)
+            self._ssql_ctx.registerDataFrameAsTable(df, tableName)
         else:
             raise ValueError("Can only register DataFrame as table")
 
@@ -456,7 +456,7 @@ class SQLContext(object):
         ...   print>>ofn, json
         >>> ofn.close()
         >>> df1 = sqlCtx.jsonFile(jsonFile)
-        >>> sqlCtx.registerRDDAsTable(df1, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
         >>> df2 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
         ...   "field6 as f4 from table1")
@@ -467,7 +467,7 @@ class SQLContext(object):
         Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
 
         >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
-        >>> sqlCtx.registerRDDAsTable(df3, "table2")
+        >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
         >>> df4 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
         ...   "field6 as f4 from table2")
@@ -485,7 +485,7 @@ class SQLContext(object):
         ...             StructField("field5",
         ...                 ArrayType(IntegerType(), False), True)]), False)])
         >>> df5 = sqlCtx.jsonFile(jsonFile, schema)
-        >>> sqlCtx.registerRDDAsTable(df5, "table3")
+        >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
         >>> df6 = sqlCtx.sql(
         ...   "SELECT field2 AS f1, field3.field5 as f2, "
         ...   "field3.field5[0] as f3 from table3")
@@ -509,7 +509,7 @@ class SQLContext(object):
         determine the schema.
 
         >>> df1 = sqlCtx.jsonRDD(json)
-        >>> sqlCtx.registerRDDAsTable(df1, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
         >>> df2 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
         ...   "field6 as f4 from table1")
@@ -520,7 +520,7 @@ class SQLContext(object):
         Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
 
         >>> df3 = sqlCtx.jsonRDD(json, df1.schema)
-        >>> sqlCtx.registerRDDAsTable(df3, "table2")
+        >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
         >>> df4 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
         ...   "field6 as f4 from table2")
@@ -538,7 +538,7 @@ class SQLContext(object):
         ...             StructField("field5",
         ...                 ArrayType(IntegerType(), False), True)]), False)])
         >>> df5 = sqlCtx.jsonRDD(json, schema)
-        >>> sqlCtx.registerRDDAsTable(df5, "table3")
+        >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
         >>> df6 = sqlCtx.sql(
         ...   "SELECT field2 AS f1, field3.field5 as f2, "
         ...   "field3.field5[0] as f3 from table3")
@@ -628,7 +628,7 @@ class SQLContext(object):
     def sql(self, sqlQuery):
         """Return a L{DataFrame} representing the result of the given query.
 
-        >>> sqlCtx.registerRDDAsTable(df, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
         >>> df2.collect()
         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
@@ -638,7 +638,7 @@ class SQLContext(object):
     def table(self, tableName):
         """Returns the specified table as a L{DataFrame}.
 
-        >>> sqlCtx.registerRDDAsTable(df, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlCtx.table("table1")
         >>> sorted(df.collect()) == sorted(df2.collect())
         True
@@ -653,7 +653,7 @@ class SQLContext(object):
         The returned DataFrame has two columns, tableName and isTemporary
         (a column with BooleanType indicating if a table is a temporary one or not).
 
-        >>> sqlCtx.registerRDDAsTable(df, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlCtx.tables()
         >>> df2.filter("tableName = 'table1'").first()
         Row(tableName=u'table1', isTemporary=True)
@@ -668,7 +668,7 @@ class SQLContext(object):
 
         If `dbName` is not specified, the current database will be used.
 
-        >>> sqlCtx.registerRDDAsTable(df, "table1")
+        >>> sqlCtx.registerDataFrameAsTable(df, "table1")
         >>> "table1" in sqlCtx.tableNames()
         True
         >>> "table1" in sqlCtx.tableNames("db")

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java b/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java
deleted file mode 100644
index aa441b2..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.jdbc;
-
-import org.apache.spark.Partition;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.DataFrame;
-
-public class JDBCUtils {
-  /**
-   * Construct a DataFrame representing the JDBC table at the database
-   * specified by url with table name table.
-   */
-  public static DataFrame jdbcRDD(SQLContext sql, String url, String table) {
-    Partition[] parts = new Partition[1];
-    parts[0] = new JDBCPartition(null, 0);
-    return sql.baseRelationToDataFrame(
-        new JDBCRelation(url, table, parts, sql));
-  }
-
-  /**
-   * Construct a DataFrame representing the JDBC table at the database
-   * specified by url with table name table partitioned by parts.
-   * Here, parts is an array of expressions suitable for insertion into a WHERE
-   * clause; each one defines one partition.
-   */
-  public static DataFrame jdbcRDD(SQLContext sql, String url, String table, String[] parts) {
-    Partition[] partitions = new Partition[parts.length];
-    for (int i = 0; i < parts.length; i++)
-      partitions[i] = new JDBCPartition(parts[i], i);
-    return sql.baseRelationToDataFrame(
-        new JDBCRelation(url, table, partitions, sql));
-  }
-
-  private static JavaJDBCTrampoline trampoline = new JavaJDBCTrampoline();
-
-  public static void createJDBCTable(DataFrame rdd, String url, String table, boolean allowExisting) {
-    trampoline.createJDBCTable(rdd, url, table, allowExisting);
-  }
-
-  public static void insertIntoJDBC(DataFrame rdd, String url, String table, boolean overwrite) {
-    trampoline.insertIntoJDBC(rdd, url, table, overwrite);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index c0c3cb4..fa5fe84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql
 
+import java.sql.DriverManager
+
+
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
@@ -27,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -77,6 +81,12 @@ private[sql] object DataFrame {
  *     .groupBy(department("name"), "gender")
  *     .agg(avg(people("salary")), max(people("age")))
  * }}}
+ *
+ * @groupname basic Basic DataFrame functions
+ * @groupname dfops Language Integrated Queries
+ * @groupname rdd RDD Operations
+ * @groupname output Output Operations
+ * @groupname action Actions
  */
 // TODO: Improve documentation.
 @Experimental
@@ -102,7 +112,8 @@ trait DataFrame extends RDDApi[Row] with Serializable {
   def toSchemaRDD: DataFrame = this
 
   /**
-   * Returns the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
+   * Returns the object itself.
+   * @group basic
    */
   // This is declared with parentheses to prevent the Scala compiler from treating
   // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
@@ -116,31 +127,51 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   rdd.toDF  // this implicit conversion creates a DataFrame with column name _1 and _2
    *   rdd.toDF("id", "name")  // this creates a DataFrame with column name "id" and "name"
    * }}}
+   * @group basic
    */
   @scala.annotation.varargs
   def toDF(colNames: String*): DataFrame
 
-  /** Returns the schema of this [[DataFrame]]. */
+  /**
+   * Returns the schema of this [[DataFrame]].
+   * @group basic
+   */
   def schema: StructType
 
-  /** Returns all column names and their data types as an array. */
+  /**
+   * Returns all column names and their data types as an array.
+   * @group basic
+   */
   def dtypes: Array[(String, String)]
 
-  /** Returns all column names as an array. */
+  /**
+   * Returns all column names as an array.
+   * @group basic
+   */
   def columns: Array[String] = schema.fields.map(_.name)
 
-  /** Prints the schema to the console in a nice tree format. */
+  /**
+   * Prints the schema to the console in a nice tree format.
+   * @group basic
+   */
   def printSchema(): Unit
 
-  /** Prints the plans (logical and physical) to the console for debugging purpose. */
+  /**
+   * Prints the plans (logical and physical) to the console for debugging purpose.
+   * @group basic
+   */
   def explain(extended: Boolean): Unit
 
-  /** Only prints the physical plan to the console for debugging purpose. */
+  /**
+   * Only prints the physical plan to the console for debugging purpose.
+   * @group basic
+   */
   def explain(): Unit = explain(extended = false)
 
   /**
    * Returns true if the `collect` and `take` methods can be run locally
    * (without any Spark executors).
+   * @group basic
    */
   def isLocal: Boolean
 
@@ -154,6 +185,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   1983  03    0.410516        0.442194
    *   1984  04    0.450090        0.483521
    * }}}
+   * @group basic
    */
   def show(): Unit
 
@@ -163,6 +195,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
    *
    * @param right Right side of the join operation.
+   * @group dfops
    */
   def join(right: DataFrame): DataFrame
 
@@ -174,6 +207,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   df1.join(df2, $"df1Key" === $"df2Key")
    *   df1.join(df2).where($"df1Key" === $"df2Key")
    * }}}
+   * @group dfops
    */
   def join(right: DataFrame, joinExprs: Column): DataFrame
 
@@ -194,6 +228,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * @param right Right side of the join.
    * @param joinExprs Join expression.
    * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
+   * @group dfops
    */
   def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
 
@@ -205,6 +240,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   df.sort($"sortcol")
    *   df.sort($"sortcol".asc)
    * }}}
+   * @group dfops
    */
   @scala.annotation.varargs
   def sort(sortCol: String, sortCols: String*): DataFrame
@@ -214,6 +250,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * {{{
    *   df.sort($"col1", $"col2".desc)
    * }}}
+   * @group dfops
    */
   @scala.annotation.varargs
   def sort(sortExprs: Column*): DataFrame
@@ -221,6 +258,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
   /**
    * Returns a new [[DataFrame]] sorted by the given expressions.
    * This is an alias of the `sort` function.
+   * @group dfops
    */
   @scala.annotation.varargs
   def orderBy(sortCol: String, sortCols: String*): DataFrame
@@ -228,27 +266,32 @@ trait DataFrame extends RDDApi[Row] with Serializable {
   /**
    * Returns a new [[DataFrame]] sorted by the given expressions.
    * This is an alias of the `sort` function.
+   * @group dfops
    */
   @scala.annotation.varargs
   def orderBy(sortExprs: Column*): DataFrame
 
   /**
    * Selects column based on the column name and return it as a [[Column]].
+   * @group dfops
    */
   def apply(colName: String): Column = col(colName)
 
   /**
    * Selects column based on the column name and return it as a [[Column]].
+   * @group dfops
    */
   def col(colName: String): Column
 
   /**
    * Returns a new [[DataFrame]] with an alias set.
+   * @group dfops
    */
   def as(alias: String): DataFrame
 
   /**
    * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
+   * @group dfops
    */
   def as(alias: Symbol): DataFrame
 
@@ -257,6 +300,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * {{{
    *   df.select($"colA", $"colB" + 1)
    * }}}
+   * @group dfops
    */
   @scala.annotation.varargs
   def select(cols: Column*): DataFrame
@@ -270,6 +314,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   df.select("colA", "colB")
    *   df.select($"colA", $"colB")
    * }}}
+   * @group dfops
    */
   @scala.annotation.varargs
   def select(col: String, cols: String*): DataFrame
@@ -281,6 +326,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * {{{
    *   df.selectExpr("colA", "colB as newName", "abs(colC)")
    * }}}
+   * @group dfops
    */
   @scala.annotation.varargs
   def selectExpr(exprs: String*): DataFrame
@@ -293,6 +339,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   peopleDf.where($"age" > 15)
    *   peopleDf($"age" > 15)
    * }}}
+   * @group dfops
    */
   def filter(condition: Column): DataFrame
 
@@ -301,6 +348,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * {{{
    *   peopleDf.filter("age > 15")
    * }}}
+   * @group dfops
    */
   def filter(conditionExpr: String): DataFrame
 
@@ -312,6 +360,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   peopleDf.where($"age" > 15)
    *   peopleDf($"age" > 15)
    * }}}
+   * @group dfops
    */
   def where(condition: Column): DataFrame
 
@@ -329,6 +378,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *     "age" -> "max"
    *   ))
    * }}}
+   * @group dfops
    */
   @scala.annotation.varargs
   def groupBy(cols: Column*): GroupedData
@@ -350,6 +400,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *     "age" -> "max"
    *   ))
    * }}}
+   * @group dfops
    */
   @scala.annotation.varargs
   def groupBy(col1: String, cols: String*): GroupedData
@@ -366,6 +417,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *     "expense" -> "sum"
    *   )
    * }}}
+   * @group dfops
    */
   def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
     groupBy().agg(aggExpr, aggExprs :_*)
@@ -378,6 +430,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   df.agg(Map("age" -> "max", "salary" -> "avg"))
    *   df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    * }}
+   * @group dfops
    */
   def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
 
@@ -388,6 +441,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   df.agg(Map("age" -> "max", "salary" -> "avg"))
    *   df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    * }}
+   * @group dfops
    */
   def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs)
 
@@ -398,6 +452,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *   df.agg(max($"age"), avg($"salary"))
    *   df.groupBy().agg(max($"age"), avg($"salary"))
    * }}
+   * @group dfops
    */
   @scala.annotation.varargs
   def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs :_*)
@@ -405,24 +460,28 @@ trait DataFrame extends RDDApi[Row] with Serializable {
   /**
    * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
    * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
+   * @group dfops
    */
   def limit(n: Int): DataFrame
 
   /**
    * Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
    * This is equivalent to `UNION ALL` in SQL.
+   * @group dfops
    */
   def unionAll(other: DataFrame): DataFrame
 
   /**
    * Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
    * This is equivalent to `INTERSECT` in SQL.
+   * @group dfops
    */
   def intersect(other: DataFrame): DataFrame
 
   /**
    * Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
    * This is equivalent to `EXCEPT` in SQL.
+   * @group dfops
    */
   def except(other: DataFrame): DataFrame
 
@@ -432,6 +491,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * @param withReplacement Sample with replacement or not.
    * @param fraction Fraction of rows to generate.
    * @param seed Seed for sampling.
+   * @group dfops
    */
   def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame
 
@@ -440,6 +500,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *
    * @param withReplacement Sample with replacement or not.
    * @param fraction Fraction of rows to generate.
+   * @group dfops
    */
   def sample(withReplacement: Boolean, fraction: Double): DataFrame = {
     sample(withReplacement, fraction, Utils.random.nextLong)
@@ -464,6 +525,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *
    *   val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
    * }}}
+   * @group dfops
    */
   def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame
 
@@ -476,6 +538,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * {{{
    *   df.explode("words", "word")(words: String => words.split(" "))
    * }}}
+   * @group dfops
    */
   def explode[A, B : TypeTag](
       inputColumn: String,
@@ -486,11 +549,13 @@ trait DataFrame extends RDDApi[Row] with Serializable {
 
   /**
    * Returns a new [[DataFrame]] by adding a column.
+   * @group dfops
    */
   def withColumn(colName: String, col: Column): DataFrame
 
   /**
    * Returns a new [[DataFrame]] with a column renamed.
+   * @group dfops
    */
   def withColumnRenamed(existingName: String, newName: String): DataFrame
 
@@ -511,62 +576,84 @@ trait DataFrame extends RDDApi[Row] with Serializable {
 
   /**
    * Returns a new RDD by applying a function to all rows of this DataFrame.
+   * @group rdd
    */
   override def map[R: ClassTag](f: Row => R): RDD[R]
 
   /**
    * Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
    * and then flattening the results.
+   * @group rdd
    */
   override def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R]
 
   /**
    * Returns a new RDD by applying a function to each partition of this DataFrame.
+   * @group rdd
    */
   override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R]
 
   /**
    * Applies a function `f` to all rows.
+   * @group rdd
    */
   override def foreach(f: Row => Unit): Unit
 
   /**
    * Applies a function f to each partition of this [[DataFrame]].
+   * @group rdd
    */
   override def foreachPartition(f: Iterator[Row] => Unit): Unit
 
   /**
    * Returns the first `n` rows in the [[DataFrame]].
+   * @group action
    */
   override def take(n: Int): Array[Row]
 
   /**
    * Returns an array that contains all of [[Row]]s in this [[DataFrame]].
+   * @group action
    */
   override def collect(): Array[Row]
 
   /**
    * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
+   * @group action
    */
   override def collectAsList(): java.util.List[Row]
 
   /**
    * Returns the number of rows in the [[DataFrame]].
+   * @group action
    */
   override def count(): Long
 
   /**
    * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
+   * @group rdd
    */
   override def repartition(numPartitions: Int): DataFrame
 
-  /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */
+  /**
+   * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+   * @group dfops
+   */
   override def distinct: DataFrame
 
+  /**
+   * @group basic
+   */
   override def persist(): this.type
 
+  /**
+   * @group basic
+   */
   override def persist(newLevel: StorageLevel): this.type
 
+  /**
+   * @group basic
+   */
   override def unpersist(blocking: Boolean): this.type
 
   /////////////////////////////////////////////////////////////////////////////
@@ -575,16 +662,19 @@ trait DataFrame extends RDDApi[Row] with Serializable {
 
   /**
    * Returns the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s.
+   * @group rdd
    */
   def rdd: RDD[Row]
 
   /**
    * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+   * @group rdd
    */
   def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD()
 
   /**
    * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+   * @group rdd
    */
   def javaRDD: JavaRDD[Row] = toJavaRDD
 
@@ -592,7 +682,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * Registers this RDD as a temporary table using the given name.  The lifetime of this temporary
    * table is tied to the [[SQLContext]] that was used to create this DataFrame.
    *
-   * @group schema
+   * @group basic
    */
   def registerTempTable(tableName: String): Unit
 
@@ -600,6 +690,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
    * Files that are written out using this method can be read back in as a [[DataFrame]]
    * using the `parquetFile` function in [[SQLContext]].
+   * @group output
    */
   def saveAsParquetFile(path: String): Unit
 
@@ -613,6 +704,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
    * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
    * be the target of an `insertInto`.
+   * @group output
    */
   @Experimental
   def saveAsTable(tableName: String): Unit = {
@@ -628,6 +720,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
    * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
    * be the target of an `insertInto`.
+   * @group output
    */
   @Experimental
   def saveAsTable(tableName: String, mode: SaveMode): Unit = {
@@ -651,6 +744,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
    * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
    * be the target of an `insertInto`.
+   * @group output
    */
   @Experimental
   def saveAsTable(
@@ -668,6 +762,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
    * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
    * be the target of an `insertInto`.
+   * @group output
    */
   @Experimental
   def saveAsTable(
@@ -686,6 +781,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
    * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
    * be the target of an `insertInto`.
+   * @group output
    */
   @Experimental
   def saveAsTable(
@@ -706,6 +802,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
    * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
    * be the target of an `insertInto`.
+   * @group output
    */
   @Experimental
   def saveAsTable(
@@ -719,6 +816,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * Saves the contents of this DataFrame to the given path,
    * using the default data source configured by spark.sql.sources.default and
    * [[SaveMode.ErrorIfExists]] as the save mode.
+   * @group output
    */
   @Experimental
   def save(path: String): Unit = {
@@ -729,6 +827,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * :: Experimental ::
    * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
    * using the default data source configured by spark.sql.sources.default.
+   * @group output
    */
   @Experimental
   def save(path: String, mode: SaveMode): Unit = {
@@ -740,6 +839,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * :: Experimental ::
    * Saves the contents of this DataFrame to the given path based on the given data source,
    * using [[SaveMode.ErrorIfExists]] as the save mode.
+   * @group output
    */
   @Experimental
   def save(path: String, source: String): Unit = {
@@ -750,6 +850,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * :: Experimental ::
    * Saves the contents of this DataFrame to the given path based on the given data source and
    * [[SaveMode]] specified by mode.
+   * @group output
    */
   @Experimental
   def save(path: String, source: String, mode: SaveMode): Unit = {
@@ -760,6 +861,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * :: Experimental ::
    * Saves the contents of this DataFrame based on the given data source,
    * [[SaveMode]] specified by mode, and a set of options.
+   * @group output
    */
   @Experimental
   def save(
@@ -774,6 +876,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * (Scala-specific)
    * Saves the contents of this DataFrame based on the given data source,
    * [[SaveMode]] specified by mode, and a set of options
+   * @group output
    */
   @Experimental
   def save(
@@ -784,6 +887,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
   /**
    * :: Experimental ::
    * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+   * @group output
    */
   @Experimental
   def insertInto(tableName: String, overwrite: Boolean): Unit
@@ -792,16 +896,47 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * :: Experimental ::
    * Adds the rows from this RDD to the specified table.
    * Throws an exception if the table already exists.
+   * @group output
    */
   @Experimental
   def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
 
   /**
    * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
+   * @group rdd
    */
   def toJSON: RDD[String]
 
   ////////////////////////////////////////////////////////////////////////////
+  // JDBC Write Support
+  ////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Save this RDD to a JDBC database at `url` under the table name `table`.
+   * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
+   * If you pass `true` for `allowExisting`, it will drop any table with the
+   * given name; if you pass `false`, it will throw if the table already
+   * exists.
+   * @group output
+   */
+  def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit
+
+  /**
+   * Save this RDD to a JDBC database at `url` under the table name `table`.
+   * Assumes the table already exists and has a compatible schema.  If you
+   * pass `true` for `overwrite`, it will `TRUNCATE` the table before
+   * performing the `INSERT`s.
+   *
+   * The table must already exist on the database.  It must have a schema
+   * that is compatible with the schema of this RDD; inserting the rows of
+   * the RDD in order via the simple statement
+   * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
+   * @group output
+   */
+  def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit
+
+
+  ////////////////////////////////////////////////////////////////////////////
   // for Python API
   ////////////////////////////////////////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 848ea2e..25bc9d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.io.CharArrayWriter
+import java.sql.DriverManager
 
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
@@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython}
+import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JsonRDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{NumericType, StructType}
@@ -375,7 +377,7 @@ private[sql] class DataFrameImpl protected[sql](
   }
 
   override def registerTempTable(tableName: String): Unit = {
-    sqlContext.registerRDDAsTable(this, tableName)
+    sqlContext.registerDataFrameAsTable(this, tableName)
   }
 
   override def saveAsParquetFile(path: String): Unit = {
@@ -441,6 +443,35 @@ private[sql] class DataFrameImpl protected[sql](
     }
   }
 
+  def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
+    val conn = DriverManager.getConnection(url)
+    try {
+      if (allowExisting) {
+        val sql = s"DROP TABLE IF EXISTS $table"
+        conn.prepareStatement(sql).executeUpdate()
+      }
+      val schema = JDBCWriteDetails.schemaString(this, url)
+      val sql = s"CREATE TABLE $table ($schema)"
+      conn.prepareStatement(sql).executeUpdate()
+    } finally {
+      conn.close()
+    }
+    JDBCWriteDetails.saveTable(this, url, table)
+  }
+
+  def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
+    if (overwrite) {
+      val conn = DriverManager.getConnection(url)
+      try {
+        val sql = s"TRUNCATE TABLE $table"
+        conn.prepareStatement(sql).executeUpdate()
+      } finally {
+        conn.close()
+      }
+    }
+    JDBCWriteDetails.saveTable(this, url, table)
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // for Python API
   ////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
index f0e6a8f..d5d7e35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
@@ -20,8 +20,13 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.Experimental
 
 /**
+ * :: Experimental ::
  * Holder for experimental methods for the bravest. We make NO guarantee about the stability
  * regarding binary compatibility and source compatibility of methods here.
+ *
+ * {{{
+ *   sqlContext.experimental.extraStrategies += ...
+ * }}}
  */
 @Experimental
 class ExperimentalMethods protected[sql](sqlContext: SQLContext) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index fc37cfa..b48b682 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -173,6 +173,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   override def insertInto(tableName: String, overwrite: Boolean): Unit = err()
 
+  def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = err()
+
+  def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = err()
+
   override def toJSON: RDD[String] = err()
 
   protected[sql] override def javaToPython: JavaRDD[Array[Byte]] = err()

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0aae094..31afa0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -43,11 +43,16 @@ import org.apache.spark.util.Utils
 import org.apache.spark.{Partition, SparkContext}
 
 /**
- * The entry point for running relational queries using Spark.  Allows the creation of [[DataFrame]]
- * objects and the execution of SQL queries.
+ * The entry point for working with structured data (rows and columns) in Spark.  Allows the
+ * creation of [[DataFrame]] objects as well as the execution of SQL queries.
  *
- * @groupname ddl_ops Catalog DDL functions
- * @groupname userf Spark SQL Functions
+ * @groupname basic Basic Operations
+ * @groupname ddl_ops Persistent Catalog DDL
+ * @groupname cachemgmt Cached Table Management
+ * @groupname genericdata Generic Data Sources
+ * @groupname specificdata Specific Data Sources
+ * @groupname config Configuration
+ * @groupname dataframes Custom DataFrame Creation
  * @groupname Ungrouped Support functions for language integrated queries.
  */
 class SQLContext(@transient val sparkContext: SparkContext)
@@ -61,24 +66,40 @@ class SQLContext(@transient val sparkContext: SparkContext)
   // Note that this is a lazy val so we can override the default value in subclasses.
   protected[sql] lazy val conf: SQLConf = new SQLConf
 
-  /** Set Spark SQL configuration properties. */
+  /**
+   * Set Spark SQL configuration properties.
+   *
+   * @group config
+   */
   def setConf(props: Properties): Unit = conf.setConf(props)
 
-  /** Set the given Spark SQL configuration property. */
+  /**
+   * Set the given Spark SQL configuration property.
+   *
+   * @group config
+   */
   def setConf(key: String, value: String): Unit = conf.setConf(key, value)
 
-  /** Return the value of Spark SQL configuration property for the given key. */
+  /**
+   * Return the value of Spark SQL configuration property for the given key.
+   *
+   * @group config
+   */
   def getConf(key: String): String = conf.getConf(key)
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
    * yet, return `defaultValue`.
+   *
+   * @group config
    */
   def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
 
   /**
    * Return all the configuration properties that have been set (i.e. not the default).
    * This creates a new copy of the config properties in the form of a Map.
+   *
+   * @group config
    */
   def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
 
@@ -128,7 +149,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
   /**
    * :: Experimental ::
    * A collection of methods that are considered experimental, but can be used to hook into
-   * the query planner for advanced functionalities.
+   * the query planner for advanced functionality.
+   *
+   * @group basic
    */
   @Experimental
   @transient
@@ -137,6 +160,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
   /**
    * :: Experimental ::
    * Returns a [[DataFrame]] with no rows or columns.
+   *
+   * @group basic
    */
   @Experimental
   @transient
@@ -167,17 +192,28 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *       (Integer arg1, String arg2) -> arg2 + arg1),
    *       DataTypes.StringType);
    * }}}
+   *
+   * @group basic
    */
   @transient
   val udf: UDFRegistration = new UDFRegistration(this)
 
-  /** Returns true if the table is currently cached in-memory. */
+  /**
+   * Returns true if the table is currently cached in-memory.
+   * @group cachemgmt
+   */
   def isCached(tableName: String): Boolean = cacheManager.isCached(tableName)
 
-  /** Caches the specified table in-memory. */
+  /**
+   * Caches the specified table in-memory.
+   * @group cachemgmt
+   */
   def cacheTable(tableName: String): Unit = cacheManager.cacheTable(tableName)
 
-  /** Removes the specified table from the in-memory cache. */
+  /**
+   * Removes the specified table from the in-memory cache.
+   * @group cachemgmt
+   */
   def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
 
   // scalastyle:off
@@ -186,6 +222,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    * (Scala-specific) Implicit methods available in Scala for converting
    * common Scala objects into [[DataFrame]]s.
+   *
+   * {{{
+   *   val sqlContext = new SQLContext
+   *   import sqlContext._
+   * }}}
+   *
+   * @group basic
    */
   @Experimental
   object implicits extends Serializable {
@@ -260,7 +303,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    * Creates a DataFrame from an RDD of case classes.
    *
-   * @group userf
+   * @group dataframes
    */
   @Experimental
   def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
@@ -274,6 +317,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
   /**
    * :: Experimental ::
    * Creates a DataFrame from a local Seq of Product.
+   *
+   * @group dataframes
    */
   @Experimental
   def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
@@ -285,6 +330,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * Convert a [[BaseRelation]] created for external data sources into a [[DataFrame]].
+   *
+   * @group dataframes
    */
   def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
     DataFrame(this, LogicalRelation(baseRelation))
@@ -318,6 +365,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *  dataFrame.registerTempTable("people")
    *  sqlContext.sql("select name from people").collect.foreach(println)
    * }}}
+   *
+   * @group dataframes
    */
   @DeveloperApi
   def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
@@ -332,6 +381,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema.
    * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
    * the provided schema. Otherwise, there will be runtime exception.
+   *
+   * @group dataframes
    */
   @DeveloperApi
   def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
@@ -346,6 +397,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @param rowRDD an JavaRDD of Row
    * @param columns names for each column
    * @return DataFrame
+   * @group dataframes
    */
   def createDataFrame(rowRDD: JavaRDD[Row], columns: java.util.List[String]): DataFrame = {
     createDataFrame(rowRDD.rdd, columns.toSeq)
@@ -356,6 +408,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
    *          SELECT * queries will return the columns in an undefined order.
+   * @group dataframes
    */
   def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
     val attributeSeq = getSchema(beanClass)
@@ -383,6 +436,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
    *          SELECT * queries will return the columns in an undefined order.
+   * @group dataframes
    */
   def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
     createDataFrame(rdd.rdd, beanClass)
@@ -416,8 +470,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *  dataFrame.registerTempTable("people")
    *  sqlContext.sql("select name from people").collect.foreach(println)
    * }}}
-   *
-   * @group userf
    */
   @deprecated("use createDataFrame", "1.3.0")
   def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
@@ -455,7 +507,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
    * [[DataFrame]] if no paths are passed in.
    *
-   * @group userf
+   * @group specificdata
    */
   @scala.annotation.varargs
   def parquetFile(paths: String*): DataFrame = {
@@ -473,7 +525,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
    * It goes through the entire dataset once to determine the schema.
    *
-   * @group userf
+   * @group specificdata
    */
   def jsonFile(path: String): DataFrame = jsonFile(path, 1.0)
 
@@ -482,7 +534,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Loads a JSON file (one object per line) and applies the given schema,
    * returning the result as a [[DataFrame]].
    *
-   * @group userf
+   * @group specificdata
    */
   @Experimental
   def jsonFile(path: String, schema: StructType): DataFrame = {
@@ -492,6 +544,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * :: Experimental ::
+   * @group specificdata
    */
   @Experimental
   def jsonFile(path: String, samplingRatio: Double): DataFrame = {
@@ -504,10 +557,18 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * [[DataFrame]].
    * It goes through the entire dataset once to determine the schema.
    *
-   * @group userf
+   * @group specificdata
    */
   def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
 
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+   * [[DataFrame]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group specificdata
+   */
   def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
 
   /**
@@ -515,7 +576,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
    * returning the result as a [[DataFrame]].
    *
-   * @group userf
+   * @group specificdata
    */
   @Experimental
   def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
@@ -528,6 +589,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
     createDataFrame(rowRDD, appliedSchema)
   }
 
+  /**
+   * :: Experimental ::
+   * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
+   * schema, returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
+   */
   @Experimental
   def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
     jsonRDD(json.rdd, schema)
@@ -535,6 +603,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * :: Experimental ::
+   * Loads an RDD[String] storing JSON objects (one object per record) inferring the
+   * schema, returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
    */
   @Experimental
   def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
@@ -546,6 +618,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
     createDataFrame(rowRDD, appliedSchema)
   }
 
+  /**
+   * :: Experimental ::
+   * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
+   * schema, returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
+   */
   @Experimental
   def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
     jsonRDD(json.rdd, samplingRatio);
@@ -555,6 +634,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    * Returns the dataset stored at path as a DataFrame,
    * using the default data source configured by spark.sql.sources.default.
+   *
+   * @group genericdata
    */
   @Experimental
   def load(path: String): DataFrame = {
@@ -565,6 +646,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
   /**
    * :: Experimental ::
    * Returns the dataset stored at path as a DataFrame, using the given data source.
+   *
+   * @group genericdata
    */
   @Experimental
   def load(path: String, source: String): DataFrame = {
@@ -575,6 +658,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    * (Java-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame.
+   *
+   * @group genericdata
    */
   @Experimental
   def load(source: String, options: java.util.Map[String, String]): DataFrame = {
@@ -585,6 +670,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    * (Scala-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame.
+   *
+   * @group genericdata
    */
   @Experimental
   def load(source: String, options: Map[String, String]): DataFrame = {
@@ -596,6 +683,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    * (Java-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+   *
+   * @group genericdata
    */
   @Experimental
   def load(
@@ -609,6 +698,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    * (Scala-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+   * @group genericdata
    */
   @Experimental
   def load(
@@ -733,54 +823,70 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * :: Experimental ::
-   * Construct an RDD representing the database table accessible via JDBC URL
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
    * url named table.
+   *
+   * @group specificdata
    */
   @Experimental
-  def jdbcRDD(url: String, table: String): DataFrame = {
-    jdbcRDD(url, table, null.asInstanceOf[JDBCPartitioningInfo])
+  def jdbc(url: String, table: String): DataFrame = {
+    jdbc(url, table, JDBCRelation.columnPartition(null))
   }
 
   /**
    * :: Experimental ::
-   * Construct an RDD representing the database table accessible via JDBC URL
-   * url named table.  The PartitioningInfo parameter
-   * gives the name of a column of integral type, a number of partitions, and
-   * advisory minimum and maximum values for the column.  The RDD is
-   * partitioned according to said column.
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+   * url named table.  Partitions of the table will be retrieved in parallel based on the parameters
+   * passed to this function.
+   *
+   * @param columnName the name of a column of integral type that will be used for partitioning.
+   * @param lowerBound the minimum value of `columnName` to retrieve
+   * @param upperBound the maximum value of `columnName` to retrieve
+   * @param numPartitions the number of partitions.  the range `minValue`-`maxValue` will be split
+   *                      evenly into this many partitions
+   *
+   * @group specificdata
    */
   @Experimental
-  def jdbcRDD(url: String, table: String, partitioning: JDBCPartitioningInfo):
-      DataFrame = {
+  def jdbc(
+      url: String,
+      table: String,
+      columnName: String,
+      lowerBound: Long,
+      upperBound: Long,
+      numPartitions: Int): DataFrame = {
+    val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
     val parts = JDBCRelation.columnPartition(partitioning)
-    jdbcRDD(url, table, parts)
+    jdbc(url, table, parts)
   }
 
   /**
    * :: Experimental ::
-   * Construct an RDD representing the database table accessible via JDBC URL
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
    * url named table.  The theParts parameter gives a list expressions
    * suitable for inclusion in WHERE clauses; each one defines one partition
-   * of the RDD.
+   * of the [[DataFrame]].
+   *
+   * @group specificdata
    */
   @Experimental
-  def jdbcRDD(url: String, table: String, theParts: Array[String]): DataFrame = {
+  def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
     val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) =>
       JDBCPartition(part, i) : Partition
     }
-    jdbcRDD(url, table, parts)
+    jdbc(url, table, parts)
   }
 
-  private def jdbcRDD(url: String, table: String, parts: Array[Partition]): DataFrame = {
+  private def jdbc(url: String, table: String, parts: Array[Partition]): DataFrame = {
     val relation = JDBCRelation(url, table, parts)(this)
     baseRelationToDataFrame(relation)
   }
 
   /**
-   * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
-   * during the lifetime of this instance of SQLContext.
+   * Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist
+   * only during the lifetime of this instance of SQLContext.
    */
-  private[sql] def registerRDDAsTable(rdd: DataFrame, tableName: String): Unit = {
+  private[sql] def registerDataFrameAsTable(rdd: DataFrame, tableName: String): Unit = {
     catalog.registerTable(Seq(tableName), rdd.logicalPlan)
   }
 
@@ -790,7 +896,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @param tableName the name of the table to be unregistered.
    *
-   * @group ddl_ops
+   * @group basic
    */
   def dropTempTable(tableName: String): Unit = {
     cacheManager.tryUncacheQuery(table(tableName))
@@ -801,7 +907,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Executes a SQL query using Spark, returning the result as a [[DataFrame]]. The dialect that is
    * used for SQL parsing can be configured with 'spark.sql.dialect'.
    *
-   * @group userf
+   * @group basic
    */
   def sql(sqlText: String): DataFrame = {
     if (conf.dialect == "sql") {
@@ -811,7 +917,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }
   }
 
-  /** Returns the specified table as a [[DataFrame]]. */
+  /**
+   * Returns the specified table as a [[DataFrame]].
+   *
+   * @group ddl_ops
+   */
   def table(tableName: String): DataFrame =
     DataFrame(this, catalog.lookupRelation(Seq(tableName)))
 
@@ -819,6 +929,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Returns a [[DataFrame]] containing names of existing tables in the current database.
    * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
    * indicating if a table is a temporary one or not).
+   *
+   * @group ddl_ops
    */
   def tables(): DataFrame = {
     DataFrame(this, ShowTablesCommand(None))
@@ -828,6 +940,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * Returns a [[DataFrame]] containing names of existing tables in the given database.
    * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
    * indicating if a table is a temporary one or not).
+   *
+   * @group ddl_ops
    */
   def tables(databaseName: String): DataFrame = {
     DataFrame(this, ShowTablesCommand(Some(databaseName)))
@@ -835,6 +949,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * Returns the names of tables in the current database as an array.
+   *
+   * @group ddl_ops
    */
   def tableNames(): Array[String] = {
     catalog.getTables(None).map {
@@ -844,6 +960,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * Returns the names of tables in the given database as an array.
+   *
+   * @group ddl_ops
    */
   def tableNames(databaseName: String): Array[String] = {
     catalog.getTables(Some(databaseName)).map {

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
index ee94a5f..295db53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types.DataType
  *   df.select( predict(df("score")) )
  * }}}
  */
-case class UserDefinedFunction(f: AnyRef, dataType: DataType) {
+case class UserDefinedFunction protected[sql] (f: AnyRef, dataType: DataType) {
 
   def apply(exprs: Column*): Column = {
     Column(ScalaUdf(f, dataType, exprs.map(_.expr)))
@@ -58,6 +58,7 @@ private[sql] case class UserDefinedPythonFunction(
     accumulator: Accumulator[JList[Array[Byte]]],
     dataType: DataType) {
 
+  /** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */
   def apply(exprs: Column*): Column = {
     val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, broadcastVars,
       accumulator, dataType, exprs.map(_.expr))

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala
new file mode 100644
index 0000000..cbbd005
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala
@@ -0,0 +1,23 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+/**
+ * Contains API classes that are specific to a single language (i.e. Java).
+ */
+package object api

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index c6cd6eb..7c92e9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -144,7 +144,7 @@ case class CacheTableCommand(
 
   override def run(sqlContext: SQLContext) = {
     plan.foreach { logicalPlan =>
-      sqlContext.registerRDDAsTable(DataFrame(sqlContext, logicalPlan), tableName)
+      sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
     }
     sqlContext.cacheTable(tableName)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index acef49a..73162b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -32,7 +32,9 @@ import org.apache.spark.sql.types._
  *
  * Usage:
  * {{{
- *   sql("SELECT key FROM src").debug
+ *   import org.apache.spark.sql.execution.debug._
+ *   sql("SELECT key FROM src").debug()
+ *   dataFrame.typeCheck()
  * }}}
  */
 package object debug {
@@ -144,11 +146,9 @@ package object debug {
   }
 
   /**
-   * :: DeveloperApi ::
    * Helper functions for checking that runtime types match a given schema.
    */
-  @DeveloperApi
-  object TypeCheck {
+  private[sql] object TypeCheck {
     def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
       case (null, _) =>
 
@@ -174,10 +174,8 @@ package object debug {
   }
 
   /**
-   * :: DeveloperApi ::
    * Augments [[DataFrame]]s with debug methods.
    */
-  @DeveloperApi
   private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
     import TypeCheck._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala
deleted file mode 100644
index 86bb67e..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.jdbc
-
-import org.apache.spark.sql.DataFrame
-
-private[jdbc] class JavaJDBCTrampoline {
-  def createJDBCTable(rdd: DataFrame, url: String, table: String, allowExisting: Boolean) {
-    rdd.createJDBCTable(url, table, allowExisting);
-  }
-
-  def insertIntoJDBC(rdd: DataFrame, url: String, table: String, overwrite: Boolean) {
-    rdd.insertIntoJDBC(url, table, overwrite);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
index 34a83f0..34f864f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.jdbc.{JDBCPartitioningInfo, JDBCRelation, JDBCPartit
 import org.apache.spark.sql.types._
 
 package object jdbc {
-  object JDBCWriteDetails extends Logging {
+  private[sql] object JDBCWriteDetails extends Logging {
     /**
      * Returns a PreparedStatement that inserts a row into table via conn.
      */
-    private def insertStatement(conn: Connection, table: String, rddSchema: StructType):
+    def insertStatement(conn: Connection, table: String, rddSchema: StructType):
         PreparedStatement = {
       val sql = new StringBuilder(s"INSERT INTO $table VALUES (")
       var fieldsLeft = rddSchema.fields.length
@@ -56,7 +56,7 @@ package object jdbc {
      * non-Serializable.  Instead, we explicitly close over all variables that
      * are used.
      */
-    private[jdbc] def savePartition(url: String, table: String, iterator: Iterator[Row],
+    def savePartition(url: String, table: String, iterator: Iterator[Row],
         rddSchema: StructType, nullTypes: Array[Int]): Iterator[Byte] = {
       val conn = DriverManager.getConnection(url)
       var committed = false
@@ -117,19 +117,14 @@ package object jdbc {
       }
       Array[Byte]().iterator
     }
-  }
 
-  /**
-   * Make it so that you can call createJDBCTable and insertIntoJDBC on a DataFrame.
-   */
-  implicit class JDBCDataFrame(rdd: DataFrame) {
     /**
      * Compute the schema string for this RDD.
      */
-    private def schemaString(url: String): String = {
+    def schemaString(df: DataFrame, url: String): String = {
       val sb = new StringBuilder()
       val quirks = DriverQuirks.get(url)
-      rdd.schema.fields foreach { field => {
+      df.schema.fields foreach { field => {
         val name = field.name
         var typ: String = quirks.getJDBCType(field.dataType)._1
         if (typ == null) typ = field.dataType match {
@@ -156,9 +151,9 @@ package object jdbc {
     /**
      * Saves the RDD to the database in a single transaction.
      */
-    private def saveTable(url: String, table: String) {
+    def saveTable(df: DataFrame, url: String, table: String) {
       val quirks = DriverQuirks.get(url)
-      var nullTypes: Array[Int] = rdd.schema.fields.map(field => {
+      var nullTypes: Array[Int] = df.schema.fields.map(field => {
         var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
         if (nullType.isEmpty) {
           field.dataType match {
@@ -175,61 +170,16 @@ package object jdbc {
             case DateType => java.sql.Types.DATE
             case DecimalType.Unlimited => java.sql.Types.DECIMAL
             case _ => throw new IllegalArgumentException(
-                s"Can't translate null value for field $field")
+              s"Can't translate null value for field $field")
           }
         } else nullType.get
       }).toArray
 
-      val rddSchema = rdd.schema
-      rdd.mapPartitions(iterator => JDBCWriteDetails.savePartition(
-          url, table, iterator, rddSchema, nullTypes)).collect()
-    }
-
-    /**
-     * Save this RDD to a JDBC database at `url` under the table name `table`.
-     * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
-     * If you pass `true` for `allowExisting`, it will drop any table with the
-     * given name; if you pass `false`, it will throw if the table already
-     * exists.
-     */
-    def createJDBCTable(url: String, table: String, allowExisting: Boolean) {
-      val conn = DriverManager.getConnection(url)
-      try {
-        if (allowExisting) {
-          val sql = s"DROP TABLE IF EXISTS $table"
-          conn.prepareStatement(sql).executeUpdate()
-        }
-        val schema = schemaString(url)
-        val sql = s"CREATE TABLE $table ($schema)"
-        conn.prepareStatement(sql).executeUpdate()
-      } finally {
-        conn.close()
+      val rddSchema = df.schema
+      df.foreachPartition { iterator =>
+        JDBCWriteDetails.savePartition(url, table, iterator, rddSchema, nullTypes)
       }
-      saveTable(url, table)
     }
 
-    /**
-     * Save this RDD to a JDBC database at `url` under the table name `table`.
-     * Assumes the table already exists and has a compatible schema.  If you
-     * pass `true` for `overwrite`, it will `TRUNCATE` the table before
-     * performing the `INSERT`s.
-     *
-     * The table must already exist on the database.  It must have a schema
-     * that is compatible with the schema of this RDD; inserting the rows of
-     * the RDD in order via the simple statement
-     * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
-     */
-    def insertIntoJDBC(url: String, table: String, overwrite: Boolean) {
-      if (overwrite) {
-        val conn = DriverManager.getConnection(url)
-        try {
-          val sql = s"TRUNCATE TABLE $table"
-          conn.prepareStatement(sql).executeUpdate()
-        } finally {
-          conn.close()
-        }
-      }
-      saveTable(url, table)
-    }
-  } // implicit class JDBCDataFrame
+  }
 } // package object jdbc

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 7dd8bea..6596645 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -55,7 +55,7 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext}
  * Parquet table scan operator. Imports the file that backs the given
  * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
  */
-case class ParquetTableScan(
+private[sql] case class ParquetTableScan(
     attributes: Seq[Attribute],
     relation: ParquetRelation,
     columnPruningPred: Seq[Expression])
@@ -210,7 +210,7 @@ case class ParquetTableScan(
  * (only detected via filename pattern so will not catch all cases).
  */
 @DeveloperApi
-case class InsertIntoParquetTable(
+private[sql] case class InsertIntoParquetTable(
     relation: ParquetRelation,
     child: SparkPlan,
     overwrite: Boolean = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index d0856df..052728c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
  * convenient to use tuples rather than special case classes when writing test cases/suites.
  * Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
  */
-trait ParquetTest {
+private[sql] trait ParquetTest {
   val sqlContext: SQLContext
 
   import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
@@ -121,7 +121,7 @@ trait ParquetTest {
       (data: Seq[T], tableName: String)
       (f: => Unit): Unit = {
     withParquetRDD(data) { rdd =>
-      sqlContext.registerRDDAsTable(rdd, tableName)
+      sqlContext.registerDataFrameAsTable(rdd, tableName)
       withTempTable(tableName)(f)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 9bb34e2..95bea92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -72,7 +72,7 @@ import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWrita
  *    null or empty string. This is similar to the `hive.exec.default.partition.name` configuration
  *    in Hive.
  */
-class DefaultSource
+private[sql] class DefaultSource
     extends RelationProvider
     with SchemaRelationProvider
     with CreatableRelationProvider {
@@ -147,7 +147,7 @@ private[sql] case class PartitionSpec(partitionColumns: StructType, partitions:
  *    discovery.
  */
 @DeveloperApi
-case class ParquetRelation2(
+private[sql] case class ParquetRelation2(
     paths: Seq[String],
     parameters: Map[String, String],
     maybeSchema: Option[StructType] = None,
@@ -600,7 +600,7 @@ case class ParquetRelation2(
   }
 }
 
-object ParquetRelation2 {
+private[sql] object ParquetRelation2 {
   // Whether we should merge schemas collected from all Parquet part-files.
   val MERGE_SCHEMA = "mergeSchema"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
index 8871616..e244752 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
@@ -53,7 +53,7 @@ private[parquet] class NanoTime extends Serializable {
     "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"
 }
 
-object NanoTime {
+private[sql] object NanoTime {
   def fromBinary(bytes: Binary): NanoTime = {
     Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
     val buf = bytes.toByteBuffer

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index dd8b3d2..5020689 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -374,7 +374,7 @@ private[sql] case class CreateTempTableUsing(
 
   def run(sqlContext: SQLContext) = {
     val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
-    sqlContext.registerRDDAsTable(
+    sqlContext.registerDataFrameAsTable(
       DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
     Seq.empty
   }
@@ -390,7 +390,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
   def run(sqlContext: SQLContext) = {
     val df = DataFrame(sqlContext, query)
     val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
-    sqlContext.registerRDDAsTable(
+    sqlContext.registerDataFrameAsTable(
       DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
 
     Seq.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java b/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java
deleted file mode 100644
index 80bd74f..0000000
--- a/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.jdbc;
-
-import org.junit.*;
-import static org.junit.Assert.*;
-import java.sql.Connection;
-import java.sql.DriverManager;
-
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.api.java.*;
-import org.apache.spark.sql.test.TestSQLContext$;
-
-public class JavaJDBCTest {
-  static String url = "jdbc:h2:mem:testdb1";
-
-  static Connection conn = null;
-
-  // This variable will always be null if TestSQLContext is intact when running
-  // these tests.  Some Java tests do not play nicely with others, however;
-  // they create a SparkContext of their own at startup and stop it at exit.
-  // This renders TestSQLContext inoperable, meaning we have to do the same
-  // thing.  If this variable is nonnull, that means we allocated a
-  // SparkContext of our own and that we need to stop it at teardown.
-  static JavaSparkContext localSparkContext = null;
-
-  static SQLContext sql = TestSQLContext$.MODULE$;
-
-  @Before
-  public void beforeTest() throws Exception {
-    if (SparkEnv.get() == null) { // A previous test destroyed TestSQLContext.
-      localSparkContext = new JavaSparkContext("local", "JavaAPISuite");
-      sql = new SQLContext(localSparkContext);
-    }
-    Class.forName("org.h2.Driver");
-    conn = DriverManager.getConnection(url);
-    conn.prepareStatement("create schema test").executeUpdate();
-    conn.prepareStatement("create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate();
-    conn.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate();
-    conn.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate();
-    conn.prepareStatement("insert into test.people values ('joe', 3)").executeUpdate();
-    conn.commit();
-  }
-
-  @After
-  public void afterTest() throws Exception {
-    if (localSparkContext != null) {
-      localSparkContext.stop();
-      localSparkContext = null;
-    }
-    try {
-      conn.close();
-    } finally {
-      conn = null;
-    }
-  }
-
-  @Test
-  public void basicTest() throws Exception {
-    DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE");
-    Row[] rows = rdd.collect();
-    assertEquals(rows.length, 3);
-  }
-
-  @Test
-  public void partitioningTest() throws Exception {
-    String[] parts = new String[2];
-    parts[0] = "THEID < 2";
-    parts[1] = "THEID = 2"; // Deliberately forget about one of them.
-    DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE", parts);
-    Row[] rows = rdd.collect();
-    assertEquals(rows.length, 2);
-  }
-
-  @Test
-  public void writeTest() throws Exception {
-    DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE");
-    JDBCUtils.createJDBCTable(rdd, url, "TEST.PEOPLECOPY", false);
-    DataFrame rdd2 = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLECOPY");
-    Row[] rows = rdd2.collect();
-    assertEquals(rows.length, 3);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d25c139..07db672 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -164,17 +164,16 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("Basic API") {
-    assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE").collect.size == 3)
+    assert(TestSQLContext.jdbc(url, "TEST.PEOPLE").collect.size == 3)
   }
 
   test("Partitioning via JDBCPartitioningInfo API") {
-    val parts = JDBCPartitioningInfo("THEID", 0, 4, 3)
-    assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3)
+    assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", "THEID", 0, 4, 3).collect.size == 3)
   }
 
   test("Partitioning via list-of-where-clauses API") {
     val parts = Array[String]("THEID < 2", "THEID >= 2")
-    assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3)
+    assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", parts).collect.size == 3)
   }
 
   test("H2 integral types") {

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 21e7093..ad2fbc3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -57,8 +57,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
     val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
 
     srdd.createJDBCTable(url, "TEST.BASICCREATETEST", false)
-    assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").count)
-    assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").collect()(0).length)
+    assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count)
+    assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length)
   }
 
   test("CREATE with overwrite") {
@@ -66,12 +66,12 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
     val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
 
     srdd.createJDBCTable(url, "TEST.DROPTEST", false)
-    assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count)
-    assert(3 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length)
+    assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
+    assert(3 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
 
     srdd2.createJDBCTable(url, "TEST.DROPTEST", true)
-    assert(1 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count)
-    assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length)
+    assert(1 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
+    assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
   }
 
   test("CREATE then INSERT to append") {
@@ -80,8 +80,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
 
     srdd.createJDBCTable(url, "TEST.APPENDTEST", false)
     srdd2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
-    assert(3 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").count)
-    assert(2 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").collect()(0).length)
+    assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count)
+    assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length)
   }
 
   test("CREATE then INSERT to truncate") {
@@ -90,8 +90,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
 
     srdd.createJDBCTable(url, "TEST.TRUNCATETEST", false)
     srdd2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true)
-    assert(1 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").count)
-    assert(2 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").collect()(0).length)
+    assert(1 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").count)
+    assert(2 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").collect()(0).length)
   }
 
   test("Incompatible INSERT to append") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org