You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/07/11 06:23:39 UTC

[spark] branch master updated: [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d5e9c5801cb [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace
d5e9c5801cb is described below

commit d5e9c5801cb1d0c8cb545b679261bd94b5ae0280
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Mon Jul 11 14:23:12 2022 +0800

    [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace
    
    ### What changes were proposed in this pull request?
    1, add `databaseExists`/`getDatabase`
    2, make sure `listTables` support 3L namespace
    3, modify sparkR-specific catalog method `tables` and `tableNames` to support 3L namespace
    
    ### Why are the changes needed?
    to support 3L namespace in SparkR
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    updated UT and manual check
    
    Closes #37132 from zhengruifeng/r_3L_dbname.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 R/pkg/NAMESPACE                                    |  2 +
 R/pkg/R/catalog.R                                  | 72 +++++++++++++++++++++-
 R/pkg/pkgdown/_pkgdown_template.yml                |  2 +
 R/pkg/tests/fulltests/test_sparkSQL.R              | 34 +++++++++-
 .../org/apache/spark/sql/api/r/SQLUtils.scala      |  2 +-
 5 files changed, 107 insertions(+), 5 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index f5f60ecf134..3937791421a 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -476,8 +476,10 @@ export("as.DataFrame",
        "createTable",
        "currentCatalog",
        "currentDatabase",
+       "databaseExists",
        "dropTempTable",
        "dropTempView",
+       "getDatabase",
        "getTable",
        "listCatalogs",
        "listColumns",
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
index 8237ac26b33..680415ea6cd 100644
--- a/R/pkg/R/catalog.R
+++ b/R/pkg/R/catalog.R
@@ -278,13 +278,14 @@ dropTempView <- function(viewName) {
 #' Returns a SparkDataFrame containing names of tables in the given database.
 #'
 #' @param databaseName (optional) name of the database
+#'                     The database name can be qualified with catalog name since 3.4.0.
 #' @return a SparkDataFrame
 #' @rdname tables
 #' @seealso \link{listTables}
 #' @examples
 #'\dontrun{
 #' sparkR.session()
-#' tables("hive")
+#' tables("spark_catalog.hive")
 #' }
 #' @name tables
 #' @note tables since 1.4.0
@@ -298,12 +299,13 @@ tables <- function(databaseName = NULL) {
 #' Returns the names of tables in the given database as an array.
 #'
 #' @param databaseName (optional) name of the database
+#'                     The database name can be qualified with catalog name since 3.4.0.
 #' @return a list of table names
 #' @rdname tableNames
 #' @examples
 #'\dontrun{
 #' sparkR.session()
-#' tableNames("hive")
+#' tableNames("spark_catalog.hive")
 #' }
 #' @name tableNames
 #' @note tableNames since 1.4.0
@@ -356,6 +358,28 @@ setCurrentDatabase <- function(databaseName) {
   invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
 }
 
+#' Checks if the database with the specified name exists.
+#'
+#' Checks if the database with the specified name exists.
+#'
+#' @param databaseName name of the database, allowed to be qualified with catalog name
+#' @rdname databaseExists
+#' @name databaseExists
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' databaseExists("spark_catalog.default")
+#' }
+#' @note since 3.4.0
+databaseExists <- function(databaseName) {
+  sparkSession <- getSparkSession()
+  if (class(databaseName) != "character") {
+    stop("databaseName must be a string.")
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  callJMethod(catalog, "databaseExists", databaseName)
+}
+
 #' Returns a list of databases available
 #'
 #' Returns a list of databases available.
@@ -375,12 +399,54 @@ listDatabases <- function() {
   dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
 }
 
+#' Get the database with the specified name
+#'
+#' Get the database with the specified name
+#'
+#' @param databaseName name of the database, allowed to be qualified with catalog name
+#' @return A named list.
+#' @rdname getDatabase
+#' @name getDatabase
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' db <- getDatabase("default")
+#' }
+#' @note since 3.4.0
+getDatabase <- function(databaseName) {
+  sparkSession <- getSparkSession()
+  if (class(databaseName) != "character") {
+    stop("databaseName must be a string.")
+  }
+  catalog <- callJMethod(sparkSession, "catalog")
+  jdb <- handledCallJMethod(catalog, "getDatabase", databaseName)
+
+  ret <- list(name = callJMethod(jdb, "name"))
+  jcata <- callJMethod(jdb, "catalog")
+  if (is.null(jcata)) {
+    ret$catalog <- NA
+  } else {
+    ret$catalog <- jcata
+  }
+
+  jdesc <- callJMethod(jdb, "description")
+  if (is.null(jdesc)) {
+    ret$description <- NA
+  } else {
+    ret$description <- jdesc
+  }
+
+  ret$locationUri <- callJMethod(jdb, "locationUri")
+  ret
+}
+
 #' Returns a list of tables or views in the specified database
 #'
 #' Returns a list of tables or views in the specified database.
 #' This includes all temporary views.
 #'
 #' @param databaseName (optional) name of the database
+#'                     The database name can be qualified with catalog name since 3.4.0.
 #' @return a SparkDataFrame of the list of tables.
 #' @rdname listTables
 #' @name listTables
@@ -389,7 +455,7 @@ listDatabases <- function() {
 #' \dontrun{
 #' sparkR.session()
 #' listTables()
-#' listTables("default")
+#' listTables("spark_catalog.default")
 #' }
 #' @note since 2.2.0
 listTables <- function(databaseName = NULL) {
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml
index a9107c1293e..df93f200ab2 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -263,8 +263,10 @@ reference:
 - contents:
   - currentCatalog
   - currentDatabase
+  - databaseExists
   - dropTempTable
   - dropTempView
+  - getDatabase
   - getTable
   - listCatalogs
   - listColumns
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 29a6c2580e4..85eca6b510b 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -673,6 +673,22 @@ test_that("test tableNames and tables", {
 
   tables <- listTables()
   expect_equal(count(tables), count + 0)
+
+  count2 <- count(listTables())
+  schema <- structType(structField("name", "string"), structField("age", "integer"),
+                       structField("height", "float"))
+  createTable("people", source = "json", schema = schema)
+
+  expect_equal(length(tableNames()), count2 + 1)
+  expect_equal(length(tableNames("default")), count2 + 1)
+  expect_equal(length(tableNames("spark_catalog.default")), count2 + 1)
+
+  tables <- listTables()
+  expect_equal(count(tables), count2 + 1)
+  expect_equal(count(tables()), count(tables))
+  expect_equal(count(tables("default")), count2 + 1)
+  expect_equal(count(tables("spark_catalog.default")), count2 + 1)
+  sql("DROP TABLE IF EXISTS people")
 })
 
 test_that(
@@ -3422,6 +3438,8 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
                "Length of type vector should match the number of columns for SparkDataFrame")
   expect_error(coltypes(df) <- c("environment", "list"),
                "Only atomic type is supported for column types")
+
+  dropTempView("dfView")
 })
 
 test_that("Method str()", {
@@ -3461,6 +3479,8 @@ test_that("Method str()", {
 
   # Test utils:::str
   expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
+
+  dropTempView("irisView")
 })
 
 test_that("Histogram", {
@@ -4033,20 +4053,32 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", {
   catalogs <- collect(listCatalogs())
 })
 
-test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
+test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases, getDatabase", {
   expect_equal(currentDatabase(), "default")
   expect_error(setCurrentDatabase("default"), NA)
   expect_error(setCurrentDatabase("zxwtyswklpf"),
                paste0("Error in setCurrentDatabase : no such database - Database ",
                "'zxwtyswklpf' not found"))
+
+  expect_true(databaseExists("default"))
+  expect_true(databaseExists("spark_catalog.default"))
+  expect_false(databaseExists("some_db"))
+  expect_false(databaseExists("spark_catalog.some_db"))
+
   dbs <- collect(listDatabases())
   expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
   expect_equal(which(dbs[, 1] == "default"), 1)
+
+  db <- getDatabase("spark_catalog.default")
+  expect_equal(db$name, "default")
+  expect_equal(db$catalog, "spark_catalog")
 })
 
 test_that("catalog APIs, listTables, listColumns, listFunctions, getTable", {
   tb <- listTables()
   count <- count(tables())
+  expect_equal(nrow(listTables("default")), count)
+  expect_equal(nrow(listTables("spark_catalog.default")), count)
   expect_equal(nrow(tb), count)
   expect_equal(colnames(tb),
                c("name", "catalog", "namespace", "description", "tableType", "isTemporary"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index f58afcfa05d..f505f55c259 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -217,7 +217,7 @@ private[sql] object SQLUtils extends Logging {
       case _ =>
         sparkSession.catalog.currentDatabase
     }
-    sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
+    sparkSession.catalog.listTables(db).collect().map(_.name)
   }
 
   def createArrayType(column: Column): ArrayType = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org