You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/07/11 06:23:39 UTC
[spark] branch master updated: [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d5e9c5801cb [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace
d5e9c5801cb is described below
commit d5e9c5801cb1d0c8cb545b679261bd94b5ae0280
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Mon Jul 11 14:23:12 2022 +0800
[SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace
### What changes were proposed in this pull request?
1, add `databaseExists`/`getDatabase`
2, make sure `listTables` support 3L namespace
3, modify sparkR-specific catalog method `tables` and `tableNames` to support 3L namespace
### Why are the changes needed?
to support 3L namespace in SparkR
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
updated UT and manual check
Closes #37132 from zhengruifeng/r_3L_dbname.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
R/pkg/NAMESPACE | 2 +
R/pkg/R/catalog.R | 72 +++++++++++++++++++++-
R/pkg/pkgdown/_pkgdown_template.yml | 2 +
R/pkg/tests/fulltests/test_sparkSQL.R | 34 +++++++++-
.../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +-
5 files changed, 107 insertions(+), 5 deletions(-)
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index f5f60ecf134..3937791421a 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -476,8 +476,10 @@ export("as.DataFrame",
"createTable",
"currentCatalog",
"currentDatabase",
+ "databaseExists",
"dropTempTable",
"dropTempView",
+ "getDatabase",
"getTable",
"listCatalogs",
"listColumns",
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
index 8237ac26b33..680415ea6cd 100644
--- a/R/pkg/R/catalog.R
+++ b/R/pkg/R/catalog.R
@@ -278,13 +278,14 @@ dropTempView <- function(viewName) {
#' Returns a SparkDataFrame containing names of tables in the given database.
#'
#' @param databaseName (optional) name of the database
+#' The database name can be qualified with catalog name since 3.4.0.
#' @return a SparkDataFrame
#' @rdname tables
#' @seealso \link{listTables}
#' @examples
#'\dontrun{
#' sparkR.session()
-#' tables("hive")
+#' tables("spark_catalog.hive")
#' }
#' @name tables
#' @note tables since 1.4.0
@@ -298,12 +299,13 @@ tables <- function(databaseName = NULL) {
#' Returns the names of tables in the given database as an array.
#'
#' @param databaseName (optional) name of the database
+#' The database name can be qualified with catalog name since 3.4.0.
#' @return a list of table names
#' @rdname tableNames
#' @examples
#'\dontrun{
#' sparkR.session()
-#' tableNames("hive")
+#' tableNames("spark_catalog.hive")
#' }
#' @name tableNames
#' @note tableNames since 1.4.0
@@ -356,6 +358,28 @@ setCurrentDatabase <- function(databaseName) {
invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
}
+#' Checks if the database with the specified name exists.
+#'
+#' Checks if the database with the specified name exists.
+#'
+#' @param databaseName name of the database, allowed to be qualified with catalog name
+#' @rdname databaseExists
+#' @name databaseExists
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' databaseExists("spark_catalog.default")
+#' }
+#' @note since 3.4.0
+databaseExists <- function(databaseName) {
+ sparkSession <- getSparkSession()
+ if (class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ callJMethod(catalog, "databaseExists", databaseName)
+}
+
#' Returns a list of databases available
#'
#' Returns a list of databases available.
@@ -375,12 +399,54 @@ listDatabases <- function() {
dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
}
+#' Get the database with the specified name
+#'
+#' Get the database with the specified name
+#'
+#' @param databaseName name of the database, allowed to be qualified with catalog name
+#' @return A named list.
+#' @rdname getDatabase
+#' @name getDatabase
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' db <- getDatabase("default")
+#' }
+#' @note since 3.4.0
+getDatabase <- function(databaseName) {
+ sparkSession <- getSparkSession()
+ if (class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ jdb <- handledCallJMethod(catalog, "getDatabase", databaseName)
+
+ ret <- list(name = callJMethod(jdb, "name"))
+ jcata <- callJMethod(jdb, "catalog")
+ if (is.null(jcata)) {
+ ret$catalog <- NA
+ } else {
+ ret$catalog <- jcata
+ }
+
+ jdesc <- callJMethod(jdb, "description")
+ if (is.null(jdesc)) {
+ ret$description <- NA
+ } else {
+ ret$description <- jdesc
+ }
+
+ ret$locationUri <- callJMethod(jdb, "locationUri")
+ ret
+}
+
#' Returns a list of tables or views in the specified database
#'
#' Returns a list of tables or views in the specified database.
#' This includes all temporary views.
#'
#' @param databaseName (optional) name of the database
+#' The database name can be qualified with catalog name since 3.4.0.
#' @return a SparkDataFrame of the list of tables.
#' @rdname listTables
#' @name listTables
@@ -389,7 +455,7 @@ listDatabases <- function() {
#' \dontrun{
#' sparkR.session()
#' listTables()
-#' listTables("default")
+#' listTables("spark_catalog.default")
#' }
#' @note since 2.2.0
listTables <- function(databaseName = NULL) {
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml
index a9107c1293e..df93f200ab2 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -263,8 +263,10 @@ reference:
- contents:
- currentCatalog
- currentDatabase
+ - databaseExists
- dropTempTable
- dropTempView
+ - getDatabase
- getTable
- listCatalogs
- listColumns
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 29a6c2580e4..85eca6b510b 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -673,6 +673,22 @@ test_that("test tableNames and tables", {
tables <- listTables()
expect_equal(count(tables), count + 0)
+
+ count2 <- count(listTables())
+ schema <- structType(structField("name", "string"), structField("age", "integer"),
+ structField("height", "float"))
+ createTable("people", source = "json", schema = schema)
+
+ expect_equal(length(tableNames()), count2 + 1)
+ expect_equal(length(tableNames("default")), count2 + 1)
+ expect_equal(length(tableNames("spark_catalog.default")), count2 + 1)
+
+ tables <- listTables()
+ expect_equal(count(tables), count2 + 1)
+ expect_equal(count(tables()), count(tables))
+ expect_equal(count(tables("default")), count2 + 1)
+ expect_equal(count(tables("spark_catalog.default")), count2 + 1)
+ sql("DROP TABLE IF EXISTS people")
})
test_that(
@@ -3422,6 +3438,8 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
"Length of type vector should match the number of columns for SparkDataFrame")
expect_error(coltypes(df) <- c("environment", "list"),
"Only atomic type is supported for column types")
+
+ dropTempView("dfView")
})
test_that("Method str()", {
@@ -3461,6 +3479,8 @@ test_that("Method str()", {
# Test utils:::str
expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
+
+ dropTempView("irisView")
})
test_that("Histogram", {
@@ -4033,20 +4053,32 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", {
catalogs <- collect(listCatalogs())
})
-test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
+test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases, getDatabase", {
expect_equal(currentDatabase(), "default")
expect_error(setCurrentDatabase("default"), NA)
expect_error(setCurrentDatabase("zxwtyswklpf"),
paste0("Error in setCurrentDatabase : no such database - Database ",
"'zxwtyswklpf' not found"))
+
+ expect_true(databaseExists("default"))
+ expect_true(databaseExists("spark_catalog.default"))
+ expect_false(databaseExists("some_db"))
+ expect_false(databaseExists("spark_catalog.some_db"))
+
dbs <- collect(listDatabases())
expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
expect_equal(which(dbs[, 1] == "default"), 1)
+
+ db <- getDatabase("spark_catalog.default")
+ expect_equal(db$name, "default")
+ expect_equal(db$catalog, "spark_catalog")
})
test_that("catalog APIs, listTables, listColumns, listFunctions, getTable", {
tb <- listTables()
count <- count(tables())
+ expect_equal(nrow(listTables("default")), count)
+ expect_equal(nrow(listTables("spark_catalog.default")), count)
expect_equal(nrow(tb), count)
expect_equal(colnames(tb),
c("name", "catalog", "namespace", "description", "tableType", "isTemporary"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index f58afcfa05d..f505f55c259 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -217,7 +217,7 @@ private[sql] object SQLUtils extends Logging {
case _ =>
sparkSession.catalog.currentDatabase
}
- sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
+ sparkSession.catalog.listTables(db).collect().map(_.name)
}
def createArrayType(column: Column): ArrayType = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org