You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2016/02/02 19:53:27 UTC

spark git commit: [SPARK-12629][SPARKR] Fixes for DataFrame saveAsTable method

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9a3d1bd09 -> 53f518a6e


[SPARK-12629][SPARKR] Fixes for DataFrame saveAsTable method

I've tried to solve some of the issues mentioned in: https://issues.apache.org/jira/browse/SPARK-12629
Please, let me know what do you think.
Thanks!

Author: Narine Kokhlikyan <na...@gmail.com>

Closes #10580 from NarineK/sparkrSavaAsRable.

(cherry picked from commit 8a88e121283472c26e70563a4e04c109e9b183b3)
Signed-off-by: Shivaram Venkataraman <sh...@cs.berkeley.edu>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53f518a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53f518a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53f518a6

Branch: refs/heads/branch-1.6
Commit: 53f518a6e2791cc4967793b6cc0d4a68d579cb33
Parents: 9a3d1bd
Author: Narine Kokhlikyan <na...@gmail.com>
Authored: Fri Jan 22 10:35:02 2016 -0800
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Tue Feb 2 10:53:24 2016 -0800

----------------------------------------------------------------------
 R/pkg/R/DataFrame.R                       | 23 +++++++++++++++++------
 R/pkg/R/generics.R                        | 12 ++++++++++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 15 ++++++++++++++-
 3 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53f518a6/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a6c6a1d..5b47f0b 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1972,7 +1972,13 @@ setMethod("write.df",
           signature(df = "DataFrame", path = "character"),
           function(df, path, source = NULL, mode = "error", ...){
             if (is.null(source)) {
-              sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
+              if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
+                sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
+              } else if (exists(".sparkRHivesc", envir = .sparkREnv)) {
+                sqlContext <- get(".sparkRHivesc", envir = .sparkREnv)
+              } else {
+                stop("sparkRHive or sparkRSQL context has to be specified")
+              }
               source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
                                     "org.apache.spark.sql.parquet")
             }
@@ -2033,13 +2039,18 @@ setMethod("saveDF",
 #' saveAsTable(df, "myfile")
 #' }
 setMethod("saveAsTable",
-          signature(df = "DataFrame", tableName = "character", source = "character",
-                    mode = "character"),
+          signature(df = "DataFrame", tableName = "character"),
           function(df, tableName, source = NULL, mode="error", ...){
             if (is.null(source)) {
-              sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
-              source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
-                                    "org.apache.spark.sql.parquet")
+              if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
+                sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
+              } else if (exists(".sparkRHivesc", envir = .sparkREnv)) {
+                sqlContext <- get(".sparkRHivesc", envir = .sparkREnv)
+              } else {
+                stop("sparkRHive or sparkRSQL context has to be specified")
+              }
+               source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
+                                     "org.apache.spark.sql.parquet")
             }
             allModes <- c("append", "overwrite", "error", "ignore")
             # nolint start

http://git-wip-us.apache.org/repos/asf/spark/blob/53f518a6/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 816bbd0..62a854e 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -528,7 +528,7 @@ setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("samp
 
 #' @rdname saveAsTable
 #' @export
-setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
+setGeneric("saveAsTable", function(df, tableName, source = NULL, mode = "error", ...) {
   standardGeneric("saveAsTable")
 })
 
@@ -541,7 +541,15 @@ setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") })
 
 #' @rdname write.df
 #' @export
-setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })
+setGeneric("write.df", function(df, path, source = NULL, mode = "error", ...) {
+  standardGeneric("write.df")
+})
+
+#' @rdname write.df
+#' @export
+setGeneric("saveDF", function(df, path, source = NULL, mode = "error", ...) {
+  standardGeneric("saveDF")
+})
 
 #' @rdname write.json
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/53f518a6/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index d6e498d..adef440 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -902,8 +902,21 @@ test_that("test HiveContext", {
   df3 <- sql(hiveCtx, "select * from json2")
   expect_is(df3, "DataFrame")
   expect_equal(count(df3), 3)
-
   unlink(jsonPath2)
+
+  hivetestDataPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
+  invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
+  df4 <- sql(hiveCtx, "select * from hivetestbl")
+  expect_is(df4, "DataFrame")
+  expect_equal(count(df4), 3)
+  unlink(hivetestDataPath)
+
+  parquetDataPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
+  invisible(saveAsTable(df, "parquetest", "parquet", mode="overwrite", path=parquetDataPath))
+  df5 <- sql(hiveCtx, "select * from parquetest")
+  expect_is(df5, "DataFrame")
+  expect_equal(count(df5), 3)
+  unlink(parquetDataPath)
 })
 
 test_that("column operators", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org