You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2016/01/06 07:35:48 UTC
spark git commit: [SPARK-12393][SPARKR] Add read.text and write.text
for SparkR
Repository: spark
Updated Branches:
refs/heads/master b3ba1be3b -> d1fea4136
[SPARK-12393][SPARKR] Add read.text and write.text for SparkR
Add ```read.text``` and ```write.text``` for SparkR.
cc sun-rui felixcheung shivaram
Author: Yanbo Liang <yb...@gmail.com>
Closes #10348 from yanboliang/spark-12393.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1fea413
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1fea413
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1fea413
Branch: refs/heads/master
Commit: d1fea41363c175a67b97cb7b3fe89f9043708739
Parents: b3ba1be
Author: Yanbo Liang <yb...@gmail.com>
Authored: Wed Jan 6 12:05:41 2016 +0530
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Wed Jan 6 12:05:41 2016 +0530
----------------------------------------------------------------------
R/pkg/NAMESPACE | 4 +++-
R/pkg/R/DataFrame.R | 28 ++++++++++++++++++++++++++
R/pkg/R/SQLContext.R | 26 ++++++++++++++++++++++++
R/pkg/R/generics.R | 4 ++++
R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 +++++++++++++++++++
5 files changed, 82 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ccc01fe..beacc39 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -94,7 +94,8 @@ exportMethods("arrange",
"withColumnRenamed",
"write.df",
"write.json",
- "write.parquet")
+ "write.parquet",
+ "write.text")
exportClasses("Column")
@@ -274,6 +275,7 @@ export("as.DataFrame",
"parquetFile",
"read.df",
"read.parquet",
+ "read.text",
"sql",
"table",
"tableNames",
http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index c126f9e..3bf5bc9 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -664,6 +664,34 @@ setMethod("saveAsParquetFile",
write.parquet(x, path)
})
+#' write.text
+#'
+#' Saves the content of the DataFrame in a text file at the specified path.
+#' The DataFrame must have only one column of string type with the name "value".
+#' Each row becomes a new line in the output file.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family DataFrame functions
+#' @rdname write.text
+#' @name write.text
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.txt"
+#' df <- read.text(sqlContext, path)
+#' write.text(df, "/tmp/sparkr-tmp/")
+#'}
+setMethod("write.text",
+ signature(x = "DataFrame", path = "character"),
+ function(x, path) {
+ write <- callJMethod(x@sdf, "write")
+ invisible(callJMethod(write, "text", path))
+ })
+
#' Distinct
#'
#' Return a new DataFrame containing the distinct rows in this DataFrame.
http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index ccc683d..99679b4 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) {
read.parquet(sqlContext, unlist(list(...)))
}
+#' Create a DataFrame from a text file.
+#'
+#' Loads a text file and returns a DataFrame with a single string column named "value".
+#' Each line in the text file is a new row in the resulting DataFrame.
+#'
+#' @param sqlContext SQLContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @return DataFrame
+#' @rdname read.text
+#' @name read.text
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.txt"
+#' df <- read.text(sqlContext, path)
+#' }
+read.text <- function(sqlContext, path) {
+ # Allow the user to have a more flexible definiton of the text file path
+ paths <- as.list(suppressWarnings(normalizePath(path)))
+ read <- callJMethod(sqlContext, "read")
+ sdf <- callJMethod(read, "text", paths)
+ dataFrame(sdf)
+}
+
#' SQL Query
#'
#' Executes a SQL query using Spark, returning the result as a DataFrame.
http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 62be2dd..ba68617 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -549,6 +549,10 @@ setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet")
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+#' @rdname write.text
+#' @export
+setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
+
#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
http://git-wip-us.apache.org/repos/asf/spark/blob/d1fea413/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ebe8faa..eaf60be 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1497,6 +1497,27 @@ test_that("read/write Parquet files", {
unlink(parquetPath4)
})
+test_that("read/write text files", {
+ # Test write.df and read.df
+ df <- read.df(sqlContext, jsonPath, "text")
+ expect_is(df, "DataFrame")
+ expect_equal(colnames(df), c("value"))
+ expect_equal(count(df), 3)
+ textPath <- tempfile(pattern = "textPath", fileext = ".txt")
+ write.df(df, textPath, "text", mode="overwrite")
+
+ # Test write.text and read.text
+ textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
+ write.text(df, textPath2)
+ df2 <- read.text(sqlContext, c(textPath, textPath2))
+ expect_is(df2, "DataFrame")
+ expect_equal(colnames(df2), c("value"))
+ expect_equal(count(df2), count(df) * 2)
+
+ unlink(textPath)
+ unlink(textPath2)
+})
+
test_that("describe() and summarize() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
stats <- describe(df, "age")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org