You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/06/01 00:02:10 UTC

spark git commit: [SPARK-7227] [SPARKR] Support fillna / dropna in R DataFrame.

Repository: spark
Updated Branches:
  refs/heads/master 866652c90 -> 46576ab30


[SPARK-7227] [SPARKR] Support fillna / dropna in R DataFrame.

Author: Sun Rui <ru...@intel.com>

Closes #6183 from sun-rui/SPARK-7227 and squashes the following commits:

dd6f5b3 [Sun Rui] Rename readEnv() back to readMap(). Add alias na.omit() for dropna().
41cf725 [Sun Rui] [SPARK-7227][SPARKR] Support fillna / dropna in R DataFrame.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46576ab3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46576ab3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46576ab3

Branch: refs/heads/master
Commit: 46576ab303e50c54c3bd464f8939953efe644574
Parents: 866652c
Author: Sun Rui <ru...@intel.com>
Authored: Sun May 31 15:01:21 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Sun May 31 15:01:59 2015 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |   2 +
 R/pkg/R/DataFrame.R                             | 125 +++++++++++++++++++
 R/pkg/R/generics.R                              |  18 +++
 R/pkg/R/serialize.R                             |  10 +-
 R/pkg/inst/tests/test_sparkSQL.R                | 109 ++++++++++++++++
 .../scala/org/apache/spark/api/r/SerDe.scala    |   6 +-
 6 files changed, 267 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46576ab3/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 411126a..f9447f6 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -19,9 +19,11 @@ exportMethods("arrange",
               "count",
               "describe",
               "distinct",
+              "dropna",
               "dtypes",
               "except",
               "explain",
+              "fillna",
               "filter",
               "first",
               "group_by",

http://git-wip-us.apache.org/repos/asf/spark/blob/46576ab3/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index e79d324..0af5cb8 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1429,3 +1429,128 @@ setMethod("describe",
             sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
             dataFrame(sdf)
           })
+
+#' dropna
+#'
+#' Returns a new DataFrame omitting rows with null values.
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param how "any" or "all".
+#'            if "any", drop a row if it contains any nulls.
+#'            if "all", drop a row only if all its values are null.
+#'            if minNonNulls is specified, how is ignored.
+#' @param minNonNulls If specified, drop rows that have less than
+#'                    minNonNulls non-null values.
+#'                    This overwrites the how parameter.
+#' @param cols Optional list of column names to consider.
+#' @return A DataFrame
+#' 
+#' @rdname nafunctions
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' dropna(df)
+#' }
+setMethod("dropna",
+          signature(x = "DataFrame"),
+          function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+            how <- match.arg(how)
+            if (is.null(cols)) {
+              cols <- columns(x)
+            }
+            if (is.null(minNonNulls)) {
+              minNonNulls <- if (how == "any") { length(cols) } else { 1 }
+            }
+            
+            naFunctions <- callJMethod(x@sdf, "na")
+            sdf <- callJMethod(naFunctions, "drop",
+                               as.integer(minNonNulls), listToSeq(as.list(cols)))
+            dataFrame(sdf)
+          })
+
+#' @aliases dropna
+#' @export
+setMethod("na.omit",
+          signature(x = "DataFrame"),
+          function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
+            dropna(x, how, minNonNulls, cols)
+          })
+
+#' fillna
+#'
+#' Replace null values.
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param value Value to replace null values with.
+#'              Should be an integer, numeric, character or named list.
+#'              If the value is a named list, then cols is ignored and
+#'              value must be a mapping from column name (character) to 
+#'              replacement value. The replacement value must be an
+#'              integer, numeric or character.
+#' @param cols optional list of column names to consider.
+#'             Columns specified in cols that do not have matching data
+#'             type are ignored. For example, if value is a character, and 
+#'             subset contains a non-character column, then the non-character
+#'             column is simply ignored.
+#' @return A DataFrame
+#' 
+#' @rdname nafunctions
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' fillna(df, 1)
+#' fillna(df, list("age" = 20, "name" = "unknown"))
+#' }
+setMethod("fillna",
+          signature(x = "DataFrame"),
+          function(x, value, cols = NULL) {
+            if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
+              stop("value should be an integer, numeric, charactor or named list.")
+            }
+            
+            if (class(value) == "list") {
+              # Check column names in the named list
+              colNames <- names(value)
+              if (length(colNames) == 0 || !all(colNames != "")) {
+                stop("value should be an a named list with each name being a column name.")
+              }
+              
+              # Convert to the named list to an environment to be passed to JVM
+              valueMap <- new.env()
+              for (col in colNames) {
+                # Check each item in the named list is of valid type
+                v <- value[[col]]
+                if (!(class(v) %in% c("integer", "numeric", "character"))) {
+                  stop("Each item in value should be an integer, numeric or charactor.")
+                }
+                valueMap[[col]] <- v
+              }
+              
+              # When value is a named list, caller is expected not to pass in cols
+              if (!is.null(cols)) {
+                warning("When value is a named list, cols is ignored!")
+                cols <- NULL
+              }
+              
+              value <- valueMap
+            } else if (is.integer(value)) {
+              # Cast an integer to a numeric
+              value <- as.numeric(value)
+            }
+            
+            naFunctions <- callJMethod(x@sdf, "na")
+            sdf <- if (length(cols) == 0) {
+              callJMethod(naFunctions, "fill", value)
+            } else {
+              callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
+            }
+            dataFrame(sdf)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/46576ab3/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 1f4fc6a..12e0917 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -396,6 +396,20 @@ setGeneric("columns", function(x) {standardGeneric("columns") })
 #' @export
 setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
 
+#' @rdname nafunctions
+#' @export
+setGeneric("dropna",
+           function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) { 
+             standardGeneric("dropna") 
+           })
+
+#' @rdname nafunctions
+#' @export
+setGeneric("na.omit",
+           function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) { 
+             standardGeneric("na.omit") 
+           })
+
 #' @rdname schema
 #' @export
 setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
@@ -408,6 +422,10 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
 #' @export
 setGeneric("except", function(x, y) { standardGeneric("except") })
 
+#' @rdname nafunctions
+#' @export
+setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })
+
 #' @rdname filter
 #' @export
 setGeneric("filter", function(x, condition) { standardGeneric("filter") })

http://git-wip-us.apache.org/repos/asf/spark/blob/46576ab3/R/pkg/R/serialize.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R
index c53d0a9..2081786 100644
--- a/R/pkg/R/serialize.R
+++ b/R/pkg/R/serialize.R
@@ -160,6 +160,14 @@ writeList <- function(con, arr) {
   }
 }
 
+# Used to pass arrays where the elements can be of different types
+writeGenericList <- function(con, list) {
+  writeInt(con, length(list))
+  for (elem in list) {
+    writeObject(con, elem)
+  }
+}
+  
 # Used to pass in hash maps required on Java side.
 writeEnv <- function(con, env) {
   len <- length(env)
@@ -168,7 +176,7 @@ writeEnv <- function(con, env) {
   if (len > 0) {
     writeList(con, as.list(ls(env)))
     vals <- lapply(ls(env), function(x) { env[[x]] })
-    writeList(con, as.list(vals))
+    writeGenericList(con, as.list(vals))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/46576ab3/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 1857e63..d2d82e7 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -32,6 +32,15 @@ jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
 parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet")
 writeLines(mockLines, jsonPath)
 
+# For test nafunctions, like dropna(), fillna(),...
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+                 "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+                 "{\"name\":\"David\",\"age\":60,\"height\":null}",
+                 "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
+                 "{\"name\":null,\"age\":null,\"height\":null}")
+jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp")
+writeLines(mockLinesNa, jsonPathNa)
+
 test_that("infer types", {
   expect_equal(infer_type(1L), "integer")
   expect_equal(infer_type(1.0), "double")
@@ -765,5 +774,105 @@ test_that("describe() on a DataFrame", {
   expect_equal(collect(stats)[5, "age"], "30")
 })
 
+test_that("dropna() on a DataFrame", {
+  df <- jsonFile(sqlContext, jsonPathNa)
+  rows <- collect(df)
+
+  # drop with columns
+  
+  expected <- rows[!is.na(rows$name),]
+  actual <- collect(dropna(df, cols = "name"))
+  expect_true(identical(expected, actual))
+
+  expected <- rows[!is.na(rows$age),]
+  actual <- collect(dropna(df, cols = "age"))
+  row.names(expected) <- row.names(actual)
+  # identical on two dataframes does not work here. Don't know why.
+  # use identical on all columns as a workaround.
+  expect_true(identical(expected$age, actual$age))
+  expect_true(identical(expected$height, actual$height))
+  expect_true(identical(expected$name, actual$name))
+  
+  expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
+  actual <- collect(dropna(df, cols = c("age", "height")))
+  expect_true(identical(expected, actual))
+
+  expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+  actual <- collect(dropna(df))
+  expect_true(identical(expected, actual))
+  
+  # drop with how
+
+  expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+  actual <- collect(dropna(df))
+  expect_true(identical(expected, actual))
+
+  expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name),]
+  actual <- collect(dropna(df, "all"))
+  expect_true(identical(expected, actual))
+  
+  expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
+  actual <- collect(dropna(df, "any"))
+  expect_true(identical(expected, actual))
+
+  expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
+  actual <- collect(dropna(df, "any", cols = c("age", "height")))
+  expect_true(identical(expected, actual))
+
+  expected <- rows[!is.na(rows$age) | !is.na(rows$height),]
+  actual <- collect(dropna(df, "all", cols = c("age", "height")))
+  expect_true(identical(expected, actual))
+  
+  # drop with threshold
+  
+  expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2,]
+  actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
+  expect_true(identical(expected, actual))  
+
+  expected <- rows[as.integer(!is.na(rows$age)) + 
+                   as.integer(!is.na(rows$height)) +
+                   as.integer(!is.na(rows$name)) >= 3,]
+  actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
+  expect_true(identical(expected, actual))
+})
+
+test_that("fillna() on a DataFrame", {
+  df <- jsonFile(sqlContext, jsonPathNa)
+  rows <- collect(df)
+  
+  # fill with value
+  
+  expected <- rows
+  expected$age[is.na(expected$age)] <- 50
+  expected$height[is.na(expected$height)] <- 50.6
+  actual <- collect(fillna(df, 50.6))
+  expect_true(identical(expected, actual))
+
+  expected <- rows
+  expected$name[is.na(expected$name)] <- "unknown"
+  actual <- collect(fillna(df, "unknown"))
+  expect_true(identical(expected, actual))
+
+  expected <- rows
+  expected$age[is.na(expected$age)] <- 50
+  actual <- collect(fillna(df, 50.6, "age"))
+  expect_true(identical(expected, actual))
+
+  expected <- rows
+  expected$name[is.na(expected$name)] <- "unknown"
+  actual <- collect(fillna(df, "unknown", c("age", "name")))
+  expect_true(identical(expected, actual))
+  
+  # fill with named list
+
+  expected <- rows
+  expected$age[is.na(expected$age)] <- 50
+  expected$height[is.na(expected$height)] <- 50.6
+  expected$name[is.na(expected$name)] <- "unknown"
+  actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
+  expect_true(identical(expected, actual))  
+})
+
 unlink(parquetPath)
 unlink(jsonPath)
+unlink(jsonPathNa)

http://git-wip-us.apache.org/repos/asf/spark/blob/46576ab3/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index 371dfe4..f8e3f1a 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -157,9 +157,11 @@ private[spark] object SerDe {
       val keysLen = readInt(in)
       val keys = (0 until keysLen).map(_ => readTypedObject(in, keysType))
 
-      val valuesType = readObjectType(in)
       val valuesLen = readInt(in)
-      val values = (0 until valuesLen).map(_ => readTypedObject(in, valuesType))
+      val values = (0 until valuesLen).map(_ => {
+        val valueType = readObjectType(in)
+        readTypedObject(in, valueType)
+      })
       mapAsJavaMap(keys.zip(values).toMap)
     } else {
       new java.util.HashMap[Object, Object]()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org