You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2016/04/27 00:34:34 UTC

spark git commit: [SPARK-13734][SPARKR] Added histogram function

Repository: spark
Updated Branches:
  refs/heads/master 75879ac3c -> 0c99c23b7


[SPARK-13734][SPARKR] Added histogram function

## What changes were proposed in this pull request?

Added method histogram() to compute the histogram of a Column

Usage:

```
## Create a DataFrame from the Iris dataset
irisDF <- createDataFrame(sqlContext, iris)

## Render a histogram for the Sepal_Length column
histogram(irisDF, "Sepal_Length", nbins=12)

```
![histogram](https://cloud.githubusercontent.com/assets/13985649/13588486/e1e751c6-e484-11e5-85db-2fc2115c4bb2.png)

Note: Usage will change once SPARK-9325 is figured out so that histogram() only takes a Column as a parameter, as opposed to a DataFrame and a name

## How was this patch tested?

All unit tests pass. I added specific unit cases for different scenarios.

Author: Oscar D. Lara Yejas <od...@oscars-mbp.usca.ibm.com>
Author: Oscar D. Lara Yejas <od...@oscars-mbp.attlocal.net>

Closes #11569 from olarayej/SPARK-13734.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c99c23b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c99c23b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c99c23b

Branch: refs/heads/master
Commit: 0c99c23b7d9f0c3538cd2b062d551411712a2bcc
Parents: 75879ac
Author: Oscar D. Lara Yejas <od...@oscars-mbp.usca.ibm.com>
Authored: Tue Apr 26 15:34:30 2016 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Tue Apr 26 15:34:30 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |   1 +
 R/pkg/R/DataFrame.R                       | 120 +++++++++++++++++++++++++
 R/pkg/R/generics.R                        |   4 +
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  45 ++++++++++
 4 files changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c99c23b/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index c0a63d6..ea31bae 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -173,6 +173,7 @@ exportMethods("%in%",
               "getItem",
               "greatest",
               "hex",
+              "histogram",
               "hour",
               "hypot",
               "ifelse",

http://git-wip-us.apache.org/repos/asf/spark/blob/0c99c23b/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 890d15d..36aedfa 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2469,6 +2469,126 @@ setMethod("drop",
             base::drop(x)
           })
 
+#' This function computes a histogram for a given SparkR Column.
+#' 
+#' @name histogram
+#' @title Histogram
+#' @param nbins the number of bins (optional). Default value is 10.
+#' @param df the SparkDataFrame containing the Column to build the histogram from.
+#' @param colname the name of the column to build the histogram from.
+#' @return a data.frame with the histogram statistics, i.e., counts and centroids.
+#' @rdname histogram
+#' @family SparkDataFrame functions
+#' @export
+#' @examples 
+#' \dontrun{
+#' 
+#' # Create a SparkDataFrame from the Iris dataset
+#' irisDF <- createDataFrame(sqlContext, iris)
+#' 
+#' # Compute histogram statistics
+#' histStats <- histogram(irisDF, irisDF$Sepal_Length, nbins = 12)
+#'
+#' # Once SparkR has computed the histogram statistics, the histogram can be
+#' # rendered using the ggplot2 library:
+#'
+#' require(ggplot2)
+#' plot <- ggplot(histStats, aes(x = centroids, y = counts)) +
+#'         geom_bar(stat = "identity") +
+#'         xlab("Sepal_Length") + ylab("Frequency")   
+#' } 
+setMethod("histogram",
+          signature(df = "SparkDataFrame", col = "characterOrColumn"),
+          function(df, col, nbins = 10) {
+            # Validate nbins
+            if (nbins < 2) {
+              stop("The number of bins must be a positive integer number greater than 1.")
+            }
+
+            # Round nbins to the smallest integer
+            nbins <- floor(nbins)
+
+            # Validate col
+            if (is.null(col)) {
+              stop("col must be specified.")
+            }
+
+            colname <- col
+            x <- if (class(col) == "character") {
+              if (!colname %in% names(df)) {
+                stop("Specified colname does not belong to the given SparkDataFrame.")
+              }
+
+              # Filter NA values in the target column and remove all other columns
+              df <- na.omit(df[, colname])
+              getColumn(df, colname)
+
+            } else if (class(col) == "Column") {
+
+              # The given column needs to be appended to the SparkDataFrame so that we can
+              # use method describe() to compute statistics in one single pass. The new
+              # column must have a name that doesn't exist in the dataset.
+              # To do so, we generate a random column name with more characters than the
+              # longest colname in the dataset, but no more than 100 (think of a UUID).
+              # This column name will never be visible to the user, so the name is irrelevant.
+              # Limiting the colname length to 100 makes debugging easier and it does
+              # introduce a negligible probability of collision: assuming the user has 1 million
+              # columns AND all of them have names 100 characters long (which is very unlikely),
+              # AND they run 1 billion histograms, the probability of collision will roughly be
+              # 1 in 4.4 x 10 ^ 96
+              colname <- paste(base:::sample(c(letters, LETTERS),
+                                             size = min(max(nchar(colnames(df))) + 1, 100),
+                                             replace = TRUE),
+                               collapse = "")
+
+              # Append the given column to the dataset. This is to support Columns that
+              # don't belong to the SparkDataFrame but are rather expressions
+              df <- withColumn(df, colname, col)
+
+              # Filter NA values in the target column. Cannot remove all other columns
+              # since given Column may be an expression on one or more existing columns
+              df <- na.omit(df)
+
+              col
+            }
+
+            # At this point, df only has one column: the one to compute the histogram from
+            stats <- collect(describe(df[, colname]))
+            min <- as.numeric(stats[4, 2])
+            max <- as.numeric(stats[5, 2])
+
+            # Normalize the data
+            xnorm <- (x - min) / (max - min)
+
+            # Round the data to 4 significant digits. This is to avoid rounding issues.
+            xnorm <- cast(xnorm * 10000, "integer") / 10000.0
+
+            # Since min = 0, max = 1 (data is already normalized)
+            normBinSize <- 1 / nbins
+            binsize <- (max - min) / nbins
+            approxBins <- xnorm / normBinSize
+
+            # Adjust values that are equal to the upper bound of each bin
+            bins <- cast(approxBins -
+                           ifelse(approxBins == cast(approxBins, "integer") & x != min, 1, 0),
+                         "integer")
+
+            df$bins <- bins
+            histStats <- collect(count(groupBy(df, "bins")))
+            names(histStats) <- c("bins", "counts")
+
+            # Fill bins with zero counts
+            y <- data.frame("bins" = seq(0, nbins - 1))
+            histStats <- merge(histStats, y, all.x = T, all.y = T)
+            histStats[is.na(histStats$count), 2] <- 0
+
+            # Compute centroids
+            histStats$centroids <- histStats$bins * binsize + min + binsize / 2
+
+            # Return the statistics
+            return(histStats)
+          })
+
 #' Saves the content of the SparkDataFrame to an external database table via JDBC
 #'
 #' Additional JDBC database connection properties can be set (...)

http://git-wip-us.apache.org/repos/asf/spark/blob/0c99c23b/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index f654d83..6290711 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -106,6 +106,10 @@ setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
 # @export
 setGeneric("glom", function(x) { standardGeneric("glom") })
 
+# @rdname histogram
+# @export
+setGeneric("histogram", function(df, col, nbins=10) { standardGeneric("histogram") })
+
 # @rdname keyBy
 # @export
 setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") })

http://git-wip-us.apache.org/repos/asf/spark/blob/0c99c23b/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9244c56..3360680 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1972,6 +1972,51 @@ test_that("Method str()", {
   expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
 })
 
+test_that("Histogram", {
+
+  # Basic histogram test with colname
+  expect_equal(
+    all(histogram(irisDF, "Petal_Width", 8) ==
+        data.frame(bins = seq(0, 7),
+                   counts = c(48, 2, 7, 21, 24, 19, 15, 14),
+                   centroids = seq(0, 7) * 0.3 + 0.25)),
+        TRUE)
+
+  # Basic histogram test with Column
+  expect_equal(
+    all(histogram(irisDF, irisDF$Petal_Width, 8) ==
+          data.frame(bins = seq(0, 7),
+                     counts = c(48, 2, 7, 21, 24, 19, 15, 14),
+                     centroids = seq(0, 7) * 0.3 + 0.25)),
+    TRUE)
+
+  # Basic histogram test with derived column
+  expect_equal(
+    all(round(histogram(irisDF, irisDF$Petal_Width + 1, 8), 2) ==
+          data.frame(bins = seq(0, 7),
+                     counts = c(48, 2, 7, 21, 24, 19, 15, 14),
+                     centroids = seq(0, 7) * 0.3 + 1.25)),
+    TRUE)
+
+  # Missing nbins
+  expect_equal(length(histogram(irisDF, "Petal_Width")$counts), 10)
+
+  # Wrong colname
+  expect_error(histogram(irisDF, "xxx"),
+               "Specified colname does not belong to the given SparkDataFrame.")
+
+  # Invalid nbins
+  expect_error(histogram(irisDF, "Petal_Width", nbins = 0),
+               "The number of bins must be a positive integer number greater than 1.")
+
+  # Test against R's hist
+  expect_equal(all(hist(iris$Sepal.Width)$counts ==
+                   histogram(irisDF, "Sepal_Width", 12)$counts), T)
+
+  # Test when there are zero counts
+  df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100)))
+  expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1))
+})
 unlink(parquetPath)
 unlink(jsonPath)
 unlink(jsonPathNa)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org