You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2016/01/06 07:35:48 UTC

spark git commit: [SPARK-12393][SPARKR] Add read.text and write.text for SparkR

Repository: spark
Updated Branches:
  refs/heads/master b3ba1be3b -> d1fea4136


[SPARK-12393][SPARKR] Add read.text and write.text for SparkR

Add ```read.text``` and ```write.text``` for SparkR.
cc sun-rui felixcheung shivaram

Author: Yanbo Liang <yb...@gmail.com>

Closes #10348 from yanboliang/spark-12393.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1fea413
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1fea413
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1fea413

Branch: refs/heads/master
Commit: d1fea41363c175a67b97cb7b3fe89f9043708739
Parents: b3ba1be
Author: Yanbo Liang <yb...@gmail.com>
Authored: Wed Jan 6 12:05:41 2016 +0530
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Wed Jan 6 12:05:41 2016 +0530

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  4 +++-
 R/pkg/R/DataFrame.R                       | 28 ++++++++++++++++++++++++++
 R/pkg/R/SQLContext.R                      | 26 ++++++++++++++++++++++++
 R/pkg/R/generics.R                        |  4 ++++
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 +++++++++++++++++++
 5 files changed, 82 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ccc01fe..beacc39 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -94,7 +94,8 @@ exportMethods("arrange",
               "withColumnRenamed",
               "write.df",
               "write.json",
-              "write.parquet")
+              "write.parquet",
+              "write.text")
 
 exportClasses("Column")
 
@@ -274,6 +275,7 @@ export("as.DataFrame",
        "parquetFile",
        "read.df",
        "read.parquet",
+       "read.text",
        "sql",
        "table",
        "tableNames",

http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index c126f9e..3bf5bc9 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -664,6 +664,34 @@ setMethod("saveAsParquetFile",
             write.parquet(x, path)
           })
 
+#' write.text
+#'
+#' Saves the content of the DataFrame in a text file at the specified path.
+#' The DataFrame must have only one column of string type with the name "value".
+#' Each row becomes a new line in the output file.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family DataFrame functions
+#' @rdname write.text
+#' @name write.text
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.txt"
+#' df <- read.text(sqlContext, path)
+#' write.text(df, "/tmp/sparkr-tmp/")
+#'}
+setMethod("write.text",
+          signature(x = "DataFrame", path = "character"),
+          function(x, path) {
+            write <- callJMethod(x@sdf, "write")
+            invisible(callJMethod(write, "text", path))
+          })
+
 #' Distinct
 #'
 #' Return a new DataFrame containing the distinct rows in this DataFrame.

http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index ccc683d..99679b4 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) {
   read.parquet(sqlContext, unlist(list(...)))
 }
 
+#' Create a DataFrame from a text file.
+#'
+#' Loads a text file and returns a DataFrame with a single string column named "value".
+#' Each line in the text file is a new row in the resulting DataFrame.
+#'
+#' @param sqlContext SQLContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @return DataFrame
+#' @rdname read.text
+#' @name read.text
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.txt"
+#' df <- read.text(sqlContext, path)
+#' }
+read.text <- function(sqlContext, path) {
+  # Allow the user to have a more flexible definiton of the text file path
+  paths <- as.list(suppressWarnings(normalizePath(path)))
+  read <- callJMethod(sqlContext, "read")
+  sdf <- callJMethod(read, "text", paths)
+  dataFrame(sdf)
+}
+
 #' SQL Query
 #'
 #' Executes a SQL query using Spark, returning the result as a DataFrame.

http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 62be2dd..ba68617 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -549,6 +549,10 @@ setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet")
 #' @export
 setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
 
+#' @rdname write.text
+#' @export
+setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
+
 #' @rdname schema
 #' @export
 setGeneric("schema", function(x) { standardGeneric("schema") })

http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ebe8faa..eaf60be 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1497,6 +1497,27 @@ test_that("read/write Parquet files", {
   unlink(parquetPath4)
 })
 
+test_that("read/write text files", {
+  # Test write.df and read.df
+  df <- read.df(sqlContext, jsonPath, "text")
+  expect_is(df, "DataFrame")
+  expect_equal(colnames(df), c("value"))
+  expect_equal(count(df), 3)
+  textPath <- tempfile(pattern = "textPath", fileext = ".txt")
+  write.df(df, textPath, "text", mode="overwrite")
+
+  # Test write.text and read.text
+  textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
+  write.text(df, textPath2)
+  df2 <- read.text(sqlContext, c(textPath, textPath2))
+  expect_is(df2, "DataFrame")
+  expect_equal(colnames(df2), c("value"))
+  expect_equal(count(df2), count(df) * 2)
+
+  unlink(textPath)
+  unlink(textPath2)
+})
+
 test_that("describe() and summarize() on a DataFrame", {
   df <- read.json(sqlContext, jsonPath)
   stats <- describe(df, "age")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org