You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/04/02 18:59:31 UTC

spark git commit: [SPARK-20159][SPARKR][SQL] Support all catalog API in R

Repository: spark
Updated Branches:
  refs/heads/master 657cb9541 -> 93dbfe705


[SPARK-20159][SPARKR][SQL] Support all catalog API in R

## What changes were proposed in this pull request?

Add a set of catalog API in R

```
"currentDatabase",
"listColumns",
"listDatabases",
"listFunctions",
"listTables",
"recoverPartitions",
"refreshByPath",
"refreshTable",
"setCurrentDatabase",
```
https://github.com/apache/spark/pull/17483/files#diff-6929e6c5e59017ff954e110df20ed7ff

## How was this patch tested?

manual tests, unit tests

Author: Felix Cheung <fe...@hotmail.com>

Closes #17483 from felixcheung/rcatalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93dbfe70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93dbfe70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93dbfe70

Branch: refs/heads/master
Commit: 93dbfe705f3e7410a7267e406332ffb3c3077829
Parents: 657cb95
Author: Felix Cheung <fe...@hotmail.com>
Authored: Sun Apr 2 11:59:27 2017 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Sun Apr 2 11:59:27 2017 -0700

----------------------------------------------------------------------
 R/pkg/DESCRIPTION                         |   1 +
 R/pkg/NAMESPACE                           |   9 +
 R/pkg/R/SQLContext.R                      | 233 ------------
 R/pkg/R/catalog.R                         | 479 +++++++++++++++++++++++++
 R/pkg/R/utils.R                           |  18 +
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  66 +++-
 6 files changed, 569 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/93dbfe70/R/pkg/DESCRIPTION
----------------------------------------------------------------------
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 2ea90f7..00dde64 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -32,6 +32,7 @@ Collate:
     'pairRDD.R'
     'DataFrame.R'
     'SQLContext.R'
+    'catalog.R'
     'WindowSpec.R'
     'backend.R'
     'broadcast.R'

http://git-wip-us.apache.org/repos/asf/spark/blob/93dbfe70/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8be7875..c02046c 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -358,9 +358,14 @@ export("as.DataFrame",
        "clearCache",
        "createDataFrame",
        "createExternalTable",
+       "currentDatabase",
        "dropTempTable",
        "dropTempView",
        "jsonFile",
+       "listColumns",
+       "listDatabases",
+       "listFunctions",
+       "listTables",
        "loadDF",
        "parquetFile",
        "read.df",
@@ -370,7 +375,11 @@ export("as.DataFrame",
        "read.parquet",
        "read.stream",
        "read.text",
+       "recoverPartitions",
+       "refreshByPath",
+       "refreshTable",
        "setCheckpointDir",
+       "setCurrentDatabase",
        "spark.lapply",
        "spark.addFile",
        "spark.getSparkFilesRootDirectory",

http://git-wip-us.apache.org/repos/asf/spark/blob/93dbfe70/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index b75fb01..a1edef7 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -569,200 +569,6 @@ tableToDF <- function(tableName) {
   dataFrame(sdf)
 }
 
-#' Tables
-#'
-#' Returns a SparkDataFrame containing names of tables in the given database.
-#'
-#' @param databaseName name of the database
-#' @return a SparkDataFrame
-#' @rdname tables
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' tables("hive")
-#' }
-#' @name tables
-#' @method tables default
-#' @note tables since 1.4.0
-tables.default <- function(databaseName = NULL) {
-  sparkSession <- getSparkSession()
-  jdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getTables", sparkSession, databaseName)
-  dataFrame(jdf)
-}
-
-tables <- function(x, ...) {
-  dispatchFunc("tables(databaseName = NULL)", x, ...)
-}
-
-#' Table Names
-#'
-#' Returns the names of tables in the given database as an array.
-#'
-#' @param databaseName name of the database
-#' @return a list of table names
-#' @rdname tableNames
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' tableNames("hive")
-#' }
-#' @name tableNames
-#' @method tableNames default
-#' @note tableNames since 1.4.0
-tableNames.default <- function(databaseName = NULL) {
-  sparkSession <- getSparkSession()
-  callJStatic("org.apache.spark.sql.api.r.SQLUtils",
-              "getTableNames",
-              sparkSession,
-              databaseName)
-}
-
-tableNames <- function(x, ...) {
-  dispatchFunc("tableNames(databaseName = NULL)", x, ...)
-}
-
-#' Cache Table
-#'
-#' Caches the specified table in-memory.
-#'
-#' @param tableName The name of the table being cached
-#' @return SparkDataFrame
-#' @rdname cacheTable
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' path <- "path/to/file.json"
-#' df <- read.json(path)
-#' createOrReplaceTempView(df, "table")
-#' cacheTable("table")
-#' }
-#' @name cacheTable
-#' @method cacheTable default
-#' @note cacheTable since 1.4.0
-cacheTable.default <- function(tableName) {
-  sparkSession <- getSparkSession()
-  catalog <- callJMethod(sparkSession, "catalog")
-  invisible(callJMethod(catalog, "cacheTable", tableName))
-}
-
-cacheTable <- function(x, ...) {
-  dispatchFunc("cacheTable(tableName)", x, ...)
-}
-
-#' Uncache Table
-#'
-#' Removes the specified table from the in-memory cache.
-#'
-#' @param tableName The name of the table being uncached
-#' @return SparkDataFrame
-#' @rdname uncacheTable
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' path <- "path/to/file.json"
-#' df <- read.json(path)
-#' createOrReplaceTempView(df, "table")
-#' uncacheTable("table")
-#' }
-#' @name uncacheTable
-#' @method uncacheTable default
-#' @note uncacheTable since 1.4.0
-uncacheTable.default <- function(tableName) {
-  sparkSession <- getSparkSession()
-  catalog <- callJMethod(sparkSession, "catalog")
-  invisible(callJMethod(catalog, "uncacheTable", tableName))
-}
-
-uncacheTable <- function(x, ...) {
-  dispatchFunc("uncacheTable(tableName)", x, ...)
-}
-
-#' Clear Cache
-#'
-#' Removes all cached tables from the in-memory cache.
-#'
-#' @rdname clearCache
-#' @export
-#' @examples
-#' \dontrun{
-#' clearCache()
-#' }
-#' @name clearCache
-#' @method clearCache default
-#' @note clearCache since 1.4.0
-clearCache.default <- function() {
-  sparkSession <- getSparkSession()
-  catalog <- callJMethod(sparkSession, "catalog")
-  invisible(callJMethod(catalog, "clearCache"))
-}
-
-clearCache <- function() {
-  dispatchFunc("clearCache()")
-}
-
-#' (Deprecated) Drop Temporary Table
-#'
-#' Drops the temporary table with the given table name in the catalog.
-#' If the table has been cached/persisted before, it's also unpersisted.
-#'
-#' @param tableName The name of the SparkSQL table to be dropped.
-#' @seealso \link{dropTempView}
-#' @rdname dropTempTable-deprecated
-#' @export
-#' @examples
-#' \dontrun{
-#' sparkR.session()
-#' df <- read.df(path, "parquet")
-#' createOrReplaceTempView(df, "table")
-#' dropTempTable("table")
-#' }
-#' @name dropTempTable
-#' @method dropTempTable default
-#' @note dropTempTable since 1.4.0
-dropTempTable.default <- function(tableName) {
-  if (class(tableName) != "character") {
-    stop("tableName must be a string.")
-  }
-  dropTempView(tableName)
-}
-
-dropTempTable <- function(x, ...) {
-  .Deprecated("dropTempView")
-  dispatchFunc("dropTempView(viewName)", x, ...)
-}
-
-#' Drops the temporary view with the given view name in the catalog.
-#'
-#' Drops the temporary view with the given view name in the catalog.
-#' If the view has been cached before, then it will also be uncached.
-#'
-#' @param viewName the name of the view to be dropped.
-#' @return TRUE if the view is dropped successfully, FALSE otherwise.
-#' @rdname dropTempView
-#' @name dropTempView
-#' @export
-#' @examples
-#' \dontrun{
-#' sparkR.session()
-#' df <- read.df(path, "parquet")
-#' createOrReplaceTempView(df, "table")
-#' dropTempView("table")
-#' }
-#' @note since 2.0.0
-
-dropTempView <- function(viewName) {
-  sparkSession <- getSparkSession()
-  if (class(viewName) != "character") {
-    stop("viewName must be a string.")
-  }
-  catalog <- callJMethod(sparkSession, "catalog")
-  callJMethod(catalog, "dropTempView", viewName)
-}
-
 #' Load a SparkDataFrame
 #'
 #' Returns the dataset in a data source as a SparkDataFrame
@@ -841,45 +647,6 @@ loadDF <- function(x = NULL, ...) {
   dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
 }
 
-#' Create an external table
-#'
-#' Creates an external table based on the dataset in a data source,
-#' Returns a SparkDataFrame associated with the external table.
-#'
-#' The data source is specified by the \code{source} and a set of options(...).
-#' If \code{source} is not specified, the default data source configured by
-#' "spark.sql.sources.default" will be used.
-#'
-#' @param tableName a name of the table.
-#' @param path the path of files to load.
-#' @param source the name of external data source.
-#' @param ... additional argument(s) passed to the method.
-#' @return A SparkDataFrame.
-#' @rdname createExternalTable
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' df <- createExternalTable("myjson", path="path/to/json", source="json")
-#' }
-#' @name createExternalTable
-#' @method createExternalTable default
-#' @note createExternalTable since 1.4.0
-createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
-  sparkSession <- getSparkSession()
-  options <- varargsToStrEnv(...)
-  if (!is.null(path)) {
-    options[["path"]] <- path
-  }
-  catalog <- callJMethod(sparkSession, "catalog")
-  sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
-  dataFrame(sdf)
-}
-
-createExternalTable <- function(x, ...) {
-  dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
-}
-
 #' Create a SparkDataFrame representing the database table accessible via JDBC URL
 #'
 #' Additional JDBC database connection properties can be set (...)

http://git-wip-us.apache.org/repos/asf/spark/blob/93dbfe70/R/pkg/R/catalog.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
new file mode 100644
index 0000000..07a89f7
--- /dev/null
+++ b/R/pkg/R/catalog.R
@@ -0,0 +1,479 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# catalog.R: SparkSession catalog functions
+
+#' Create an external table
+#'
+#' Creates an external table based on the dataset in a data source,
+#' Returns a SparkDataFrame associated with the external table.
+#'
+#' The data source is specified by the \code{source} and a set of options(...).
+#' If \code{source} is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param tableName a name of the table.
+#' @param path the path of files to load.
+#' @param source the name of external data source.
+#' @param schema the schema of the data for certain data source.
+#' @param ... additional argument(s) passed to the method.
+#' @return A SparkDataFrame.
+#' @rdname createExternalTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- createExternalTable("myjson", path="path/to/json", source="json", schema)
+#' }
+#' @name createExternalTable
+#' @method createExternalTable default
+#' @note createExternalTable since 1.4.0
+createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
+  sparkSession <- getSparkSession()
+  options <- varargsToStrEnv(...)
+  if (!is.null(path)) {
+    options[["path"]] <- path
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  if (is.null(schema)) {
+    sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
+  } else {
+    sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
+  }
+  dataFrame(sdf)
+}
+
+createExternalTable <- function(x, ...) {
+  dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
+}
+
+#' Cache Table
+#'
+#' Caches the specified table in-memory.
+#'
+#' @param tableName The name of the table being cached
+#' @return SparkDataFrame
+#' @rdname cacheTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' createOrReplaceTempView(df, "table")
+#' cacheTable("table")
+#' }
+#' @name cacheTable
+#' @method cacheTable default
+#' @note cacheTable since 1.4.0
+cacheTable.default <- function(tableName) {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  invisible(handledCallJMethod(catalog, "cacheTable", tableName))
+}
+
+cacheTable <- function(x, ...) {
+  dispatchFunc("cacheTable(tableName)", x, ...)
+}
+
+#' Uncache Table
+#'
+#' Removes the specified table from the in-memory cache.
+#'
+#' @param tableName The name of the table being uncached
+#' @return SparkDataFrame
+#' @rdname uncacheTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' createOrReplaceTempView(df, "table")
+#' uncacheTable("table")
+#' }
+#' @name uncacheTable
+#' @method uncacheTable default
+#' @note uncacheTable since 1.4.0
+uncacheTable.default <- function(tableName) {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  invisible(handledCallJMethod(catalog, "uncacheTable", tableName))
+}
+
+uncacheTable <- function(x, ...) {
+  dispatchFunc("uncacheTable(tableName)", x, ...)
+}
+
+#' Clear Cache
+#'
+#' Removes all cached tables from the in-memory cache.
+#'
+#' @rdname clearCache
+#' @export
+#' @examples
+#' \dontrun{
+#' clearCache()
+#' }
+#' @name clearCache
+#' @method clearCache default
+#' @note clearCache since 1.4.0
+clearCache.default <- function() {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  invisible(callJMethod(catalog, "clearCache"))
+}
+
+clearCache <- function() {
+  dispatchFunc("clearCache()")
+}
+
+#' (Deprecated) Drop Temporary Table
+#'
+#' Drops the temporary table with the given table name in the catalog.
+#' If the table has been cached/persisted before, it's also unpersisted.
+#'
+#' @param tableName The name of the SparkSQL table to be dropped.
+#' @seealso \link{dropTempView}
+#' @rdname dropTempTable-deprecated
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' df <- read.df(path, "parquet")
+#' createOrReplaceTempView(df, "table")
+#' dropTempTable("table")
+#' }
+#' @name dropTempTable
+#' @method dropTempTable default
+#' @note dropTempTable since 1.4.0
+dropTempTable.default <- function(tableName) {
+  if (class(tableName) != "character") {
+    stop("tableName must be a string.")
+  }
+  dropTempView(tableName)
+}
+
+dropTempTable <- function(x, ...) {
+  .Deprecated("dropTempView")
+  dispatchFunc("dropTempView(viewName)", x, ...)
+}
+
+#' Drops the temporary view with the given view name in the catalog.
+#'
+#' Drops the temporary view with the given view name in the catalog.
+#' If the view has been cached before, then it will also be uncached.
+#'
+#' @param viewName the name of the view to be dropped.
+#' @return TRUE if the view is dropped successfully, FALSE otherwise.
+#' @rdname dropTempView
+#' @name dropTempView
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' df <- read.df(path, "parquet")
+#' createOrReplaceTempView(df, "table")
+#' dropTempView("table")
+#' }
+#' @note since 2.0.0
+dropTempView <- function(viewName) {
+  sparkSession <- getSparkSession()
+  if (class(viewName) != "character") {
+    stop("viewName must be a string.")
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  callJMethod(catalog, "dropTempView", viewName)
+}
+
+#' Tables
+#'
+#' Returns a SparkDataFrame containing names of tables in the given database.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame
+#' @rdname tables
+#' @seealso \link{listTables}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' tables("hive")
+#' }
+#' @name tables
+#' @method tables default
+#' @note tables since 1.4.0
+tables.default <- function(databaseName = NULL) {
+  # rename column to match previous output schema
+  withColumnRenamed(listTables(databaseName), "name", "tableName")
+}
+
+tables <- function(x, ...) {
+  dispatchFunc("tables(databaseName = NULL)", x, ...)
+}
+
+#' Table Names
+#'
+#' Returns the names of tables in the given database as an array.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a list of table names
+#' @rdname tableNames
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' tableNames("hive")
+#' }
+#' @name tableNames
+#' @method tableNames default
+#' @note tableNames since 1.4.0
+tableNames.default <- function(databaseName = NULL) {
+  sparkSession <- getSparkSession()
+  callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+              "getTableNames",
+              sparkSession,
+              databaseName)
+}
+
+tableNames <- function(x, ...) {
+  dispatchFunc("tableNames(databaseName = NULL)", x, ...)
+}
+
+#' Returns the current default database
+#'
+#' Returns the current default database.
+#'
+#' @return name of the current default database.
+#' @rdname currentDatabase
+#' @name currentDatabase
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' currentDatabase()
+#' }
+#' @note since 2.2.0
+currentDatabase <- function() {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  callJMethod(catalog, "currentDatabase")
+}
+
+#' Sets the current default database
+#'
+#' Sets the current default database.
+#'
+#' @param databaseName name of the database
+#' @rdname setCurrentDatabase
+#' @name setCurrentDatabase
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' setCurrentDatabase("default")
+#' }
+#' @note since 2.2.0
+setCurrentDatabase <- function(databaseName) {
+  sparkSession <- getSparkSession()
+  if (class(databaseName) != "character") {
+    stop("databaseName must be a string.")
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
+}
+
+#' Returns a list of databases available
+#'
+#' Returns a list of databases available.
+#'
+#' @return a SparkDataFrame of the list of databases.
+#' @rdname listDatabases
+#' @name listDatabases
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listDatabases()
+#' }
+#' @note since 2.2.0
+listDatabases <- function() {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
+}
+
+#' Returns a list of tables in the specified database
+#'
+#' Returns a list of tables in the specified database.
+#' This includes all temporary tables.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame of the list of tables.
+#' @rdname listTables
+#' @name listTables
+#' @seealso \link{tables}
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listTables()
+#' listTables("default")
+#' }
+#' @note since 2.2.0
+listTables <- function(databaseName = NULL) {
+  sparkSession <- getSparkSession()
+  if (!is.null(databaseName) && class(databaseName) != "character") {
+    stop("databaseName must be a string.")
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  jdst <- if (is.null(databaseName)) {
+    callJMethod(catalog, "listTables")
+  } else {
+    handledCallJMethod(catalog, "listTables", databaseName)
+  }
+  dataFrame(callJMethod(jdst, "toDF"))
+}
+
+#' Returns a list of columns for the given table in the specified database
+#'
+#' Returns a list of columns for the given table in the specified database.
+#'
+#' @param tableName a name of the table.
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame of the list of column descriptions.
+#' @rdname listColumns
+#' @name listColumns
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listColumns("mytable")
+#' }
+#' @note since 2.2.0
+listColumns <- function(tableName, databaseName = NULL) {
+  sparkSession <- getSparkSession()
+  if (!is.null(databaseName) && class(databaseName) != "character") {
+    stop("databaseName must be a string.")
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  jdst <- if (is.null(databaseName)) {
+    handledCallJMethod(catalog, "listColumns", tableName)
+  } else {
+    handledCallJMethod(catalog, "listColumns", databaseName, tableName)
+  }
+  dataFrame(callJMethod(jdst, "toDF"))
+}
+
+#' Returns a list of functions registered in the specified database
+#'
+#' Returns a list of functions registered in the specified database.
+#' This includes all temporary functions.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame of the list of function descriptions.
+#' @rdname listFunctions
+#' @name listFunctions
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listFunctions()
+#' }
+#' @note since 2.2.0
+listFunctions <- function(databaseName = NULL) {
+  sparkSession <- getSparkSession()
+  if (!is.null(databaseName) && class(databaseName) != "character") {
+    stop("databaseName must be a string.")
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  jdst <- if (is.null(databaseName)) {
+    callJMethod(catalog, "listFunctions")
+  } else {
+    handledCallJMethod(catalog, "listFunctions", databaseName)
+  }
+  dataFrame(callJMethod(jdst, "toDF"))
+}
+
+#' Recover all the partitions in the directory of a table and update the catalog
+#'
+#' Recover all the partitions in the directory of a table and update the catalog. The name should
+#' reference a partitioned table, and not a temporary view.
+#'
+#' @param tableName a name of the table.
+#' @rdname recoverPartitions
+#' @name recoverPartitions
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' recoverPartitions("myTable")
+#' }
+#' @note since 2.2.0
+recoverPartitions <- function(tableName) {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
+}
+
+#' Invalidate and refresh all the cached metadata of the given table
+#'
+#' Invalidate and refresh all the cached metadata of the given table. For performance reasons,
+#' Spark SQL or the external data source library it uses might cache certain metadata about a
+#' table, such as the location of blocks. When those change outside of Spark SQL, users should
+#' call this function to invalidate the cache.
+#'
+#' If this table is cached as an InMemoryRelation, drop the original cached version and make the
+#' new version cached lazily.
+#'
+#' @param tableName a name of the table.
+#' @rdname refreshTable
+#' @name refreshTable
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' refreshTable("myTable")
+#' }
+#' @note since 2.2.0
+refreshTable <- function(tableName) {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  invisible(handledCallJMethod(catalog, "refreshTable", tableName))
+}
+
+#' Invalidate and refresh all the cached data and metadata for SparkDataFrame containing path
+#'
+#' Invalidate and refresh all the cached data (and the associated metadata) for any SparkDataFrame
+#' that contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
+#' everything that is cached.
+#'
+#' @param path the path of the data source.
+#' @rdname refreshByPath
+#' @name refreshByPath
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' refreshByPath("/path")
+#' }
+#' @note since 2.2.0
+refreshByPath <- function(path) {
+  sparkSession <- getSparkSession()
+  catalog <- callJMethod(sparkSession, "catalog")
+  invisible(handledCallJMethod(catalog, "refreshByPath", path))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/93dbfe70/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 810de99..fbc89e9 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -846,6 +846,24 @@ captureJVMException <- function(e, method) {
     # Extract the first message of JVM exception.
     first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
     stop(paste0(rmsg, "analysis error - ", first), call. = FALSE)
+  } else
+    if (any(grep("org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ", stacktrace))) {
+    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
+                    fixed = TRUE)[[1]]
+    # Extract "Error in ..." message.
+    rmsg <- msg[1]
+    # Extract the first message of JVM exception.
+    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
+    stop(paste0(rmsg, "no such database - ", first), call. = FALSE)
+  } else
+    if (any(grep("org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ", stacktrace))) {
+    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
+                    fixed = TRUE)[[1]]
+    # Extract "Error in ..." message.
+    rmsg <- msg[1]
+    # Extract the first message of JVM exception.
+    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
+    stop(paste0(rmsg, "no such table - ", first), call. = FALSE)
   } else {
     stop(stacktrace, call. = FALSE)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/93dbfe70/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 5acf871..ad06711 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -645,16 +645,20 @@ test_that("test tableNames and tables", {
   df <- read.json(jsonPath)
   createOrReplaceTempView(df, "table1")
   expect_equal(length(tableNames()), 1)
-  tables <- tables()
+  expect_equal(length(tableNames("default")), 1)
+  tables <- listTables()
   expect_equal(count(tables), 1)
+  expect_equal(count(tables()), count(tables))
+  expect_true("tableName" %in% colnames(tables()))
+  expect_true(all(c("tableName", "database", "isTemporary") %in% colnames(tables())))
 
   suppressWarnings(registerTempTable(df, "table2"))
-  tables <- tables()
+  tables <- listTables()
   expect_equal(count(tables), 2)
   suppressWarnings(dropTempTable("table1"))
   expect_true(dropTempView("table2"))
 
-  tables <- tables()
+  tables <- listTables()
   expect_equal(count(tables), 0)
 })
 
@@ -686,6 +690,9 @@ test_that("test cache, uncache and clearCache", {
   uncacheTable("table1")
   clearCache()
   expect_true(dropTempView("table1"))
+
+  expect_error(uncacheTable("foo"),
+      "Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'")
 })
 
 test_that("insertInto() on a registered table", {
@@ -2821,7 +2828,7 @@ test_that("createDataFrame sqlContext parameter backward compatibility", {
 
   # more tests for SPARK-16538
   createOrReplaceTempView(df, "table")
-  SparkR::tables()
+  SparkR::listTables()
   SparkR::sql("SELECT 1")
   suppressWarnings(SparkR::sql(sqlContext, "SELECT * FROM table"))
   suppressWarnings(SparkR::dropTempTable(sqlContext, "table"))
@@ -2977,6 +2984,57 @@ test_that("Collect on DataFrame when NAs exists at the top of a timestamp column
   expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
 })
 
+test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
+  expect_equal(currentDatabase(), "default")
+  expect_error(setCurrentDatabase("default"), NA)
+  expect_error(setCurrentDatabase("foo"),
+               "Error in setCurrentDatabase : analysis error - Database 'foo' does not exist")
+  dbs <- collect(listDatabases())
+  expect_equal(names(dbs), c("name", "description", "locationUri"))
+  expect_equal(dbs[[1]], "default")
+})
+
+test_that("catalog APIs, listTables, listColumns, listFunctions", {
+  tb <- listTables()
+  count <- count(tables())
+  expect_equal(nrow(tb), count)
+  expect_equal(colnames(tb), c("name", "database", "description", "tableType", "isTemporary"))
+
+  createOrReplaceTempView(as.DataFrame(cars), "cars")
+
+  tb <- listTables()
+  expect_equal(nrow(tb), count + 1)
+  tbs <- collect(tb)
+  expect_true(nrow(tbs[tbs$name == "cars", ]) > 0)
+  expect_error(listTables("bar"),
+               "Error in listTables : no such database - Database 'bar' not found")
+
+  c <- listColumns("cars")
+  expect_equal(nrow(c), 2)
+  expect_equal(colnames(c),
+               c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
+  expect_equal(collect(c)[[1]][[1]], "speed")
+  expect_error(listColumns("foo", "default"),
+       "Error in listColumns : analysis error - Table 'foo' does not exist in database 'default'")
+
+  f <- listFunctions()
+  expect_true(nrow(f) >= 200) # 250
+  expect_equal(colnames(f),
+               c("name", "database", "description", "className", "isTemporary"))
+  expect_equal(take(orderBy(f, "className"), 1)$className,
+               "org.apache.spark.sql.catalyst.expressions.Abs")
+  expect_error(listFunctions("foo_db"),
+               "Error in listFunctions : analysis error - Database 'foo_db' does not exist")
+
+  # recoverPartitions does not work with tempory view
+  expect_error(recoverPartitions("cars"),
+               "no such table - Table or view 'cars' not found in database 'default'")
+  expect_error(refreshTable("cars"), NA)
+  expect_error(refreshByPath("/"), NA)
+
+  dropTempView("cars")
+})
+
 compare_list <- function(list1, list2) {
   # get testthat to show the diff by first making the 2 lists equal in length
   expect_equal(length(list1), length(list2))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org