You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/09 07:46:02 UTC
[6/7] spark git commit: [SPARK-5654] Integrate SparkR
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
new file mode 100644
index 0000000..a354cdc
--- /dev/null
+++ b/R/pkg/NAMESPACE
@@ -0,0 +1,182 @@
+#exportPattern("^[[:alpha:]]+")
+exportClasses("RDD")
+exportClasses("Broadcast")
+exportMethods(
+ "aggregateByKey",
+ "aggregateRDD",
+ "cache",
+ "checkpoint",
+ "coalesce",
+ "cogroup",
+ "collect",
+ "collectAsMap",
+ "collectPartition",
+ "combineByKey",
+ "count",
+ "countByKey",
+ "countByValue",
+ "distinct",
+ "Filter",
+ "filterRDD",
+ "first",
+ "flatMap",
+ "flatMapValues",
+ "fold",
+ "foldByKey",
+ "foreach",
+ "foreachPartition",
+ "fullOuterJoin",
+ "glom",
+ "groupByKey",
+ "join",
+ "keyBy",
+ "keys",
+ "length",
+ "lapply",
+ "lapplyPartition",
+ "lapplyPartitionsWithIndex",
+ "leftOuterJoin",
+ "lookup",
+ "map",
+ "mapPartitions",
+ "mapPartitionsWithIndex",
+ "mapValues",
+ "maximum",
+ "minimum",
+ "numPartitions",
+ "partitionBy",
+ "persist",
+ "pipeRDD",
+ "reduce",
+ "reduceByKey",
+ "reduceByKeyLocally",
+ "repartition",
+ "rightOuterJoin",
+ "sampleRDD",
+ "saveAsTextFile",
+ "saveAsObjectFile",
+ "sortBy",
+ "sortByKey",
+ "sumRDD",
+ "take",
+ "takeOrdered",
+ "takeSample",
+ "top",
+ "unionRDD",
+ "unpersist",
+ "value",
+ "values",
+ "zipRDD",
+ "zipWithIndex",
+ "zipWithUniqueId"
+ )
+
+# S3 methods exported
+export(
+ "textFile",
+ "objectFile",
+ "parallelize",
+ "hashCode",
+ "includePackage",
+ "broadcast",
+ "setBroadcastValue",
+ "setCheckpointDir"
+ )
+export("sparkR.init")
+export("sparkR.stop")
+export("print.jobj")
+useDynLib(SparkR, stringHashCode)
+importFrom(methods, setGeneric, setMethod, setOldClass)
+
+# SparkRSQL
+
+exportClasses("DataFrame")
+
+exportMethods("columns",
+ "distinct",
+ "dtypes",
+ "explain",
+ "filter",
+ "groupBy",
+ "head",
+ "insertInto",
+ "intersect",
+ "isLocal",
+ "limit",
+ "orderBy",
+ "names",
+ "printSchema",
+ "registerTempTable",
+ "repartition",
+ "sampleDF",
+ "saveAsParquetFile",
+ "saveAsTable",
+ "saveDF",
+ "schema",
+ "select",
+ "selectExpr",
+ "show",
+ "showDF",
+ "sortDF",
+ "subtract",
+ "toJSON",
+ "toRDD",
+ "unionAll",
+ "where",
+ "withColumn",
+ "withColumnRenamed")
+
+exportClasses("Column")
+
+exportMethods("abs",
+ "alias",
+ "approxCountDistinct",
+ "asc",
+ "avg",
+ "cast",
+ "contains",
+ "countDistinct",
+ "desc",
+ "endsWith",
+ "getField",
+ "getItem",
+ "isNotNull",
+ "isNull",
+ "last",
+ "like",
+ "lower",
+ "max",
+ "mean",
+ "min",
+ "rlike",
+ "sqrt",
+ "startsWith",
+ "substr",
+ "sum",
+ "sumDistinct",
+ "upper")
+
+exportClasses("GroupedData")
+exportMethods("agg")
+
+export("sparkRSQL.init",
+ "sparkRHive.init")
+
+export("cacheTable",
+ "clearCache",
+ "createDataFrame",
+ "createExternalTable",
+ "dropTempTable",
+ "jsonFile",
+ "jsonRDD",
+ "loadDF",
+ "parquetFile",
+ "sql",
+ "table",
+ "tableNames",
+ "tables",
+ "toDF",
+ "uncacheTable")
+
+export("print.structType",
+ "print.structField")
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
new file mode 100644
index 0000000..feafd56
--- /dev/null
+++ b/R/pkg/R/DataFrame.R
@@ -0,0 +1,1270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# DataFrame.R - DataFrame class and methods implemented in S4 OO classes
+
+#' @include jobj.R SQLTypes.R RDD.R pairRDD.R column.R group.R
+NULL
+
+setOldClass("jobj")
+
+#' @title S4 class that represents a DataFrame
+#' @description DataFrames can be created using functions like
+#' \code{jsonFile}, \code{table} etc.
+#' @rdname DataFrame
+#' @seealso jsonFile, table
+#'
+#' @param env An R environment that stores bookkeeping states of the DataFrame
+#' @param sdf A Java object reference to the backing Scala DataFrame
+#' @export
+setClass("DataFrame",
+ slots = list(env = "environment",
+ sdf = "jobj"))
+
+setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {
+ .Object@env <- new.env()
+ .Object@env$isCached <- isCached
+
+ .Object@sdf <- sdf
+ .Object
+})
+
+#' @rdname DataFrame
+#' @export
+dataFrame <- function(sdf, isCached = FALSE) {
+ new("DataFrame", sdf, isCached)
+}
+
+############################ DataFrame Methods ##############################################
+
+#' Print Schema of a DataFrame
+#'
+#' Prints out the schema in tree format
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname printSchema
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' printSchema(df)
+#'}
+setMethod("printSchema",
+ signature(x = "DataFrame"),
+ function(x) {
+ schemaString <- callJMethod(schema(x)$jobj, "treeString")
+ cat(schemaString)
+ })
+
+#' Get schema object
+#'
+#' Returns the schema of this DataFrame as a structType object.
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname schema
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' dfSchema <- schema(df)
+#'}
+setMethod("schema",
+ signature(x = "DataFrame"),
+ function(x) {
+ structType(callJMethod(x@sdf, "schema"))
+ })
+
+#' Explain
+#'
+#' Print the logical and physical Catalyst plans to the console for debugging.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param extended Logical. If extended is False, explain() only prints the physical plan.
+#' @rdname explain
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' explain(df, TRUE)
+#'}
+setMethod("explain",
+ signature(x = "DataFrame"),
+ function(x, extended = FALSE) {
+ queryExec <- callJMethod(x@sdf, "queryExecution")
+ if (extended) {
+ cat(callJMethod(queryExec, "toString"))
+ } else {
+ execPlan <- callJMethod(queryExec, "executedPlan")
+ cat(callJMethod(execPlan, "toString"))
+ }
+ })
+
+#' isLocal
+#'
+#' Returns True if the `collect` and `take` methods can be run locally
+#' (without any Spark executors).
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname isLocal
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' isLocal(df)
+#'}
+setMethod("isLocal",
+ signature(x = "DataFrame"),
+ function(x) {
+ callJMethod(x@sdf, "isLocal")
+ })
+
+#' ShowDF
+#'
+#' Print the first numRows rows of a DataFrame
+#'
+#' @param x A SparkSQL DataFrame
+#' @param numRows The number of rows to print. Defaults to 20.
+#'
+#' @rdname showDF
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' showDF(df)
+#'}
+setMethod("showDF",
+ signature(x = "DataFrame"),
+ function(x, numRows = 20) {
+ cat(callJMethod(x@sdf, "showString", numToInt(numRows)), "\n")
+ })
+
+#' show
+#'
+#' Print the DataFrame column names and types
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname show
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' show(df)
+#'}
+setMethod("show", "DataFrame",
+ function(object) {
+ cols <- lapply(dtypes(object), function(l) {
+ paste(l, collapse = ":")
+ })
+ s <- paste(cols, collapse = ", ")
+ cat(paste("DataFrame[", s, "]\n", sep = ""))
+ })
+
+#' DataTypes
+#'
+#' Return all column names and their data types as a list
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname dtypes
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' dtypes(df)
+#'}
+setMethod("dtypes",
+ signature(x = "DataFrame"),
+ function(x) {
+ lapply(schema(x)$fields(), function(f) {
+ c(f$name(), f$dataType.simpleString())
+ })
+ })
+
+#' Column names
+#'
+#' Return all column names as a list
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname columns
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' columns(df)
+#'}
+setMethod("columns",
+ signature(x = "DataFrame"),
+ function(x) {
+ sapply(schema(x)$fields(), function(f) {
+ f$name()
+ })
+ })
+
+#' @rdname columns
+#' @export
+setMethod("names",
+ signature(x = "DataFrame"),
+ function(x) {
+ columns(x)
+ })
+
+#' Register Temporary Table
+#'
+#' Registers a DataFrame as a Temporary Table in the SQLContext
+#'
+#' @param x A SparkSQL DataFrame
+#' @param tableName A character vector containing the name of the table
+#'
+#' @rdname registerTempTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "json_df")
+#' new_df <- sql(sqlCtx, "SELECT * FROM json_df")
+#'}
+setMethod("registerTempTable",
+ signature(x = "DataFrame", tableName = "character"),
+ function(x, tableName) {
+ callJMethod(x@sdf, "registerTempTable", tableName)
+ })
+
+#' insertInto
+#'
+#' Insert the contents of a DataFrame into a table registered in the current SQL Context.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param tableName A character vector containing the name of the table
+#' @param overwrite A logical argument indicating whether or not to overwrite
+#' the existing rows in the table.
+#'
+#' @rdname insertInto
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df <- loadDF(sqlCtx, path, "parquet")
+#' df2 <- loadDF(sqlCtx, path2, "parquet")
+#' registerTempTable(df, "table1")
+#' insertInto(df2, "table1", overwrite = TRUE)
+#'}
+setMethod("insertInto",
+ signature(x = "DataFrame", tableName = "character"),
+ function(x, tableName, overwrite = FALSE) {
+ callJMethod(x@sdf, "insertInto", tableName, overwrite)
+ })
+
+#' Cache
+#'
+#' Persist with the default storage level (MEMORY_ONLY).
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname cache-methods
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' cache(df)
+#'}
+setMethod("cache",
+ signature(x = "DataFrame"),
+ function(x) {
+ cached <- callJMethod(x@sdf, "cache")
+ x@env$isCached <- TRUE
+ x
+ })
+
+#' Persist
+#'
+#' Persist this DataFrame with the specified storage level. For details of the
+#' supported storage levels, refer to
+#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
+#'
+#' @param x The DataFrame to persist
+#' @rdname persist
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' persist(df, "MEMORY_AND_DISK")
+#'}
+setMethod("persist",
+ signature(x = "DataFrame", newLevel = "character"),
+ function(x, newLevel) {
+ callJMethod(x@sdf, "persist", getStorageLevel(newLevel))
+ x@env$isCached <- TRUE
+ x
+ })
+
+#' Unpersist
+#'
+#' Mark this DataFrame as non-persistent, and remove all blocks for it from memory and
+#' disk.
+#'
+#' @param x The DataFrame to unpersist
+#' @param blocking Whether to block until all blocks are deleted
+#' @rdname unpersist-methods
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' persist(df, "MEMORY_AND_DISK")
+#' unpersist(df)
+#'}
+setMethod("unpersist",
+ signature(x = "DataFrame"),
+ function(x, blocking = TRUE) {
+ callJMethod(x@sdf, "unpersist", blocking)
+ x@env$isCached <- FALSE
+ x
+ })
+
+#' Repartition
+#'
+#' Return a new DataFrame that has exactly numPartitions partitions.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param numPartitions The number of partitions to use.
+#' @rdname repartition
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' newDF <- repartition(df, 2L)
+#'}
+setMethod("repartition",
+ signature(x = "DataFrame", numPartitions = "numeric"),
+ function(x, numPartitions) {
+ sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
+ dataFrame(sdf)
+ })
+
+#' toJSON
+#'
+#' Convert the rows of a DataFrame into JSON objects and return an RDD where
+#' each element contains a JSON string.
+#'
+#' @param x A SparkSQL DataFrame
+#' @return A StringRRDD of JSON objects
+#' @rdname tojson
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' newRDD <- toJSON(df)
+#'}
+setMethod("toJSON",
+ signature(x = "DataFrame"),
+ function(x) {
+ rdd <- callJMethod(x@sdf, "toJSON")
+ jrdd <- callJMethod(rdd, "toJavaRDD")
+ RDD(jrdd, serializedMode = "string")
+ })
+
+#' saveAsParquetFile
+#'
+#' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out
+#' with this method can be read back in as a DataFrame using parquetFile().
+#'
+#' @param x A SparkSQL DataFrame
+#' @param path The directory where the file is saved
+#' @rdname saveAsParquetFile
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
+#'}
+setMethod("saveAsParquetFile",
+ signature(x = "DataFrame", path = "character"),
+ function(x, path) {
+ invisible(callJMethod(x@sdf, "saveAsParquetFile", path))
+ })
+
+#' Distinct
+#'
+#' Return a new DataFrame containing the distinct rows in this DataFrame.
+#'
+#' @param x A SparkSQL DataFrame
+#' @rdname distinct
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' distinctDF <- distinct(df)
+#'}
+setMethod("distinct",
+ signature(x = "DataFrame"),
+ function(x) {
+ sdf <- callJMethod(x@sdf, "distinct")
+ dataFrame(sdf)
+ })
+
+#' SampleDF
+#'
+#' Return a sampled subset of this DataFrame using a random seed.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param withReplacement Sampling with replacement or not
+#' @param fraction The (rough) sample target fraction
+#' @rdname sampleDF
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' collect(sampleDF(df, FALSE, 0.5))
+#' collect(sampleDF(df, TRUE, 0.5))
+#'}
+setMethod("sampleDF",
+ # TODO : Figure out how to send integer as java.lang.Long to JVM so
+ # we can send seed as an argument through callJMethod
+ signature(x = "DataFrame", withReplacement = "logical",
+ fraction = "numeric"),
+ function(x, withReplacement, fraction) {
+ if (fraction < 0.0) stop(cat("Negative fraction value:", fraction))
+ sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction)
+ dataFrame(sdf)
+ })
+
+#' Count
+#'
+#' Returns the number of rows in a DataFrame
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname count
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' count(df)
+#' }
+setMethod("count",
+ signature(x = "DataFrame"),
+ function(x) {
+ callJMethod(x@sdf, "count")
+ })
+
+#' Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param stringsAsFactors (Optional) A logical indicating whether or not string columns
+#' should be converted to factors. FALSE by default.
+
+#' @rdname collect-methods
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' collected <- collect(df)
+#' firstName <- collected[[1]]$name
+#' }
+setMethod("collect",
+ signature(x = "DataFrame"),
+ function(x, stringsAsFactors = FALSE) {
+ # listCols is a list of raw vectors, one per column
+ listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
+ cols <- lapply(listCols, function(col) {
+ objRaw <- rawConnection(col)
+ numRows <- readInt(objRaw)
+ col <- readCol(objRaw, numRows)
+ close(objRaw)
+ col
+ })
+ names(cols) <- columns(x)
+ do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors))
+ })
+
+#' Limit
+#'
+#' Limit the resulting DataFrame to the number of rows specified.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param num The number of rows to return
+#' @return A new DataFrame containing the number of rows specified.
+#'
+#' @rdname limit
+#' @export
+#' @examples
+#' \dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' limitedDF <- limit(df, 10)
+#' }
+setMethod("limit",
+ signature(x = "DataFrame", num = "numeric"),
+ function(x, num) {
+ res <- callJMethod(x@sdf, "limit", as.integer(num))
+ dataFrame(res)
+ })
+
+# Take the first NUM rows of a DataFrame and return a the results as a data.frame
+
+#' @rdname take
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' take(df, 2)
+#' }
+setMethod("take",
+ signature(x = "DataFrame", num = "numeric"),
+ function(x, num) {
+ limited <- limit(x, num)
+ collect(limited)
+ })
+
+#' Head
+#'
+#' Return the first NUM rows of a DataFrame as a data.frame. If NUM is NULL,
+#' then head() returns the first 6 rows in keeping with the current data.frame
+#' convention in R.
+#'
+#' @param x A SparkSQL DataFrame
+#' @param num The number of rows to return. Default is 6.
+#' @return A data.frame
+#'
+#' @rdname head
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' head(df)
+#' }
+setMethod("head",
+ signature(x = "DataFrame"),
+ function(x, num = 6L) {
+ # Default num is 6L in keeping with R's data.frame convention
+ take(x, num)
+ })
+
+#' Return the first row of a DataFrame
+#'
+#' @param x A SparkSQL DataFrame
+#'
+#' @rdname first
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' first(df)
+#' }
+setMethod("first",
+ signature(x = "DataFrame"),
+ function(x) {
+ take(x, 1)
+ })
+
+#' toRDD()
+#'
+#' Converts a Spark DataFrame to an RDD while preserving column names.
+#'
+#' @param x A Spark DataFrame
+#'
+#' @rdname DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' rdd <- toRDD(df)
+#' }
+setMethod("toRDD",
+ signature(x = "DataFrame"),
+ function(x) {
+ jrdd <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToRowRDD", x@sdf)
+ colNames <- callJMethod(x@sdf, "columns")
+ rdd <- RDD(jrdd, serializedMode = "row")
+ lapply(rdd, function(row) {
+ names(row) <- colNames
+ row
+ })
+ })
+
+#' GroupBy
+#'
+#' Groups the DataFrame using the specified columns, so we can run aggregation on them.
+#'
+#' @param x a DataFrame
+#' @return a GroupedData
+#' @seealso GroupedData
+#' @rdname DataFrame
+#' @export
+#' @examples
+#' \dontrun{
+#' # Compute the average for all numeric columns grouped by department.
+#' avg(groupBy(df, "department"))
+#'
+#' # Compute the max age and average salary, grouped by department and gender.
+#' agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max")
+#' }
+setMethod("groupBy",
+ signature(x = "DataFrame"),
+ function(x, ...) {
+ cols <- list(...)
+ if (length(cols) >= 1 && class(cols[[1]]) == "character") {
+ sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], listToSeq(cols[-1]))
+ } else {
+ jcol <- lapply(cols, function(c) { c@jc })
+ sgd <- callJMethod(x@sdf, "groupBy", listToSeq(jcol))
+ }
+ groupedData(sgd)
+ })
+
+#' Agg
+#'
+#' Compute aggregates by specifying a list of columns
+#'
+#' @rdname DataFrame
+#' @export
+setMethod("agg",
+ signature(x = "DataFrame"),
+ function(x, ...) {
+ agg(groupBy(x), ...)
+ })
+
+
+############################## RDD Map Functions ##################################
+# All of the following functions mirror the existing RDD map functions, #
+# but allow for use with DataFrames by first converting to an RRDD before calling #
+# the requested map function. #
+###################################################################################
+
+#' @rdname lapply
+setMethod("lapply",
+ signature(X = "DataFrame", FUN = "function"),
+ function(X, FUN) {
+ rdd <- toRDD(X)
+ lapply(rdd, FUN)
+ })
+
+#' @rdname lapply
+setMethod("map",
+ signature(X = "DataFrame", FUN = "function"),
+ function(X, FUN) {
+ lapply(X, FUN)
+ })
+
+#' @rdname flatMap
+setMethod("flatMap",
+ signature(X = "DataFrame", FUN = "function"),
+ function(X, FUN) {
+ rdd <- toRDD(X)
+ flatMap(rdd, FUN)
+ })
+
+#' @rdname lapplyPartition
+setMethod("lapplyPartition",
+ signature(X = "DataFrame", FUN = "function"),
+ function(X, FUN) {
+ rdd <- toRDD(X)
+ lapplyPartition(rdd, FUN)
+ })
+
+#' @rdname lapplyPartition
+setMethod("mapPartitions",
+ signature(X = "DataFrame", FUN = "function"),
+ function(X, FUN) {
+ lapplyPartition(X, FUN)
+ })
+
+#' @rdname foreach
+setMethod("foreach",
+ signature(x = "DataFrame", func = "function"),
+ function(x, func) {
+ rdd <- toRDD(x)
+ foreach(rdd, func)
+ })
+
+#' @rdname foreach
+setMethod("foreachPartition",
+ signature(x = "DataFrame", func = "function"),
+ function(x, func) {
+ rdd <- toRDD(x)
+ foreachPartition(rdd, func)
+ })
+
+
+############################## SELECT ##################################
+
+getColumn <- function(x, c) {
+ column(callJMethod(x@sdf, "col", c))
+}
+
+#' @rdname select
+setMethod("$", signature(x = "DataFrame"),
+ function(x, name) {
+ getColumn(x, name)
+ })
+
+setMethod("$<-", signature(x = "DataFrame"),
+ function(x, name, value) {
+ stopifnot(class(value) == "Column")
+ cols <- columns(x)
+ if (name %in% cols) {
+ cols <- lapply(cols, function(c) {
+ if (c == name) {
+ alias(value, name)
+ } else {
+ col(c)
+ }
+ })
+ nx <- select(x, cols)
+ } else {
+ nx <- withColumn(x, name, value)
+ }
+ x@sdf <- nx@sdf
+ x
+ })
+
+#' @rdname select
+setMethod("[[", signature(x = "DataFrame"),
+ function(x, i) {
+ if (is.numeric(i)) {
+ cols <- columns(x)
+ i <- cols[[i]]
+ }
+ getColumn(x, i)
+ })
+
+#' @rdname select
+setMethod("[", signature(x = "DataFrame", i = "missing"),
+ function(x, i, j, ...) {
+ if (is.numeric(j)) {
+ cols <- columns(x)
+ j <- cols[j]
+ }
+ if (length(j) > 1) {
+ j <- as.list(j)
+ }
+ select(x, j)
+ })
+
+#' Select
+#'
+#' Selects a set of columns with names or Column expressions.
+#' @param x A DataFrame
+#' @param col A list of columns or single Column or name
+#' @return A new DataFrame with selected columns
+#' @export
+#' @rdname select
+#' @examples
+#' \dontrun{
+#' select(df, "*")
+#' select(df, "col1", "col2")
+#' select(df, df$name, df$age + 1)
+#' select(df, c("col1", "col2"))
+#' select(df, list(df$name, df$age + 1))
+#' # Columns can also be selected using `[[` and `[`
+#' df[[2]] == df[["age"]]
+#' df[,2] == df[,"age"]
+#' # Similar to R data frames columns can also be selected using `$`
+#' df$age
+#' }
+setMethod("select", signature(x = "DataFrame", col = "character"),
+ function(x, col, ...) {
+ sdf <- callJMethod(x@sdf, "select", col, toSeq(...))
+ dataFrame(sdf)
+ })
+
+#' @rdname select
+#' @export
+setMethod("select", signature(x = "DataFrame", col = "Column"),
+ function(x, col, ...) {
+ jcols <- lapply(list(col, ...), function(c) {
+ c@jc
+ })
+ sdf <- callJMethod(x@sdf, "select", listToSeq(jcols))
+ dataFrame(sdf)
+ })
+
+#' @rdname select
+#' @export
+setMethod("select",
+ signature(x = "DataFrame", col = "list"),
+ function(x, col) {
+ cols <- lapply(col, function(c) {
+ if (class(c)== "Column") {
+ c@jc
+ } else {
+ col(c)@jc
+ }
+ })
+ sdf <- callJMethod(x@sdf, "select", listToSeq(cols))
+ dataFrame(sdf)
+ })
+
+#' SelectExpr
+#'
+#' Select from a DataFrame using a set of SQL expressions.
+#'
+#' @param x A DataFrame to be selected from.
+#' @param expr A string containing a SQL expression
+#' @param ... Additional expressions
+#' @return A DataFrame
+#' @rdname selectExpr
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' selectExpr(df, "col1", "(col2 * 5) as newCol")
+#' }
+setMethod("selectExpr",
+ signature(x = "DataFrame", expr = "character"),
+ function(x, expr, ...) {
+ exprList <- list(expr, ...)
+ sdf <- callJMethod(x@sdf, "selectExpr", listToSeq(exprList))
+ dataFrame(sdf)
+ })
+
+#' WithColumn
+#'
+#' Return a new DataFrame with the specified column added.
+#'
+#' @param x A DataFrame
+#' @param colName A string containing the name of the new column.
+#' @param col A Column expression.
+#' @return A DataFrame with the new column added.
+#' @rdname withColumn
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' newDF <- withColumn(df, "newCol", df$col1 * 5)
+#' }
+setMethod("withColumn",
+ signature(x = "DataFrame", colName = "character", col = "Column"),
+ function(x, colName, col) {
+ select(x, x$"*", alias(col, colName))
+ })
+
+#' WithColumnRenamed
+#'
+#' Rename an existing column in a DataFrame.
+#'
+#' @param x A DataFrame
+#' @param existingCol The name of the column you want to change.
+#' @param newCol The new column name.
+#' @return A DataFrame with the column name changed.
+#' @rdname withColumnRenamed
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' newDF <- withColumnRenamed(df, "col1", "newCol1")
+#' }
+setMethod("withColumnRenamed",
+ signature(x = "DataFrame", existingCol = "character", newCol = "character"),
+ function(x, existingCol, newCol) {
+ cols <- lapply(columns(x), function(c) {
+ if (c == existingCol) {
+ alias(col(c), newCol)
+ } else {
+ col(c)
+ }
+ })
+ select(x, cols)
+ })
+
+setClassUnion("characterOrColumn", c("character", "Column"))
+
+#' SortDF
+#'
+#' Sort a DataFrame by the specified column(s).
+#'
+#' @param x A DataFrame to be sorted.
+#' @param col Either a Column object or character vector indicating the field to sort on
+#' @param ... Additional sorting fields
+#' @return A DataFrame where all elements are sorted.
+#' @rdname sortDF
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' sortDF(df, df$col1)
+#' sortDF(df, "col1")
+#' sortDF(df, asc(df$col1), desc(abs(df$col2)))
+#' }
+setMethod("sortDF",
+ signature(x = "DataFrame", col = "characterOrColumn"),
+ function(x, col, ...) {
+ if (class(col) == "character") {
+ sdf <- callJMethod(x@sdf, "sort", col, toSeq(...))
+ } else if (class(col) == "Column") {
+ jcols <- lapply(list(col, ...), function(c) {
+ c@jc
+ })
+ sdf <- callJMethod(x@sdf, "sort", listToSeq(jcols))
+ }
+ dataFrame(sdf)
+ })
+
+#' @rdname sortDF
+#' @export
+setMethod("orderBy",
+ signature(x = "DataFrame", col = "characterOrColumn"),
+ function(x, col) {
+ sortDF(x, col)
+ })
+
+#' Filter
+#'
+#' Filter the rows of a DataFrame according to a given condition.
+#'
+#' @param x A DataFrame to be sorted.
+#' @param condition The condition to sort on. This may either be a Column expression
+#' or a string containing a SQL statement
+#' @return A DataFrame containing only the rows that meet the condition.
+#' @rdname filter
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' filter(df, "col1 > 0")
+#' filter(df, df$col2 != "abcdefg")
+#' }
+setMethod("filter",
+ signature(x = "DataFrame", condition = "characterOrColumn"),
+ function(x, condition) {
+ if (class(condition) == "Column") {
+ condition <- condition@jc
+ }
+ sdf <- callJMethod(x@sdf, "filter", condition)
+ dataFrame(sdf)
+ })
+
+#' @rdname filter
+#' @export
+setMethod("where",
+ signature(x = "DataFrame", condition = "characterOrColumn"),
+ function(x, condition) {
+ filter(x, condition)
+ })
+
+#' Join
+#'
+#' Join two DataFrames based on the given join expression.
+#'
+#' @param x A Spark DataFrame
+#' @param y A Spark DataFrame
+#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
+#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
+#' @param joinType The type of join to perform. The following join types are available:
+#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
+#' @return A DataFrame containing the result of the join operation.
+#' @rdname join
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlCtx, path)
+#' df2 <- jsonFile(sqlCtx, path2)
+#' join(df1, df2) # Performs a Cartesian
+#' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
+#' join(df1, df2, df1$col1 == df2$col2, "right_outer")
+#' }
+setMethod("join",
+ signature(x = "DataFrame", y = "DataFrame"),
+ function(x, y, joinExpr = NULL, joinType = NULL) {
+ if (is.null(joinExpr)) {
+ sdf <- callJMethod(x@sdf, "join", y@sdf)
+ } else {
+ if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
+ if (is.null(joinType)) {
+ sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
+ } else {
+ if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) {
+ sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
+ } else {
+ stop("joinType must be one of the following types: ",
+ "'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'")
+ }
+ }
+ }
+ dataFrame(sdf)
+ })
+
+#' UnionAll
+#'
+#' Return a new DataFrame containing the union of rows in this DataFrame
+#' and another DataFrame. This is equivalent to `UNION ALL` in SQL.
+#'
+#' @param x A Spark DataFrame
+#' @param y A Spark DataFrame
+#' @return A DataFrame containing the result of the union.
+#' @rdname unionAll
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlCtx, path)
+#' df2 <- jsonFile(sqlCtx, path2)
+#' unioned <- unionAll(df, df2)
+#' }
+setMethod("unionAll",
+ signature(x = "DataFrame", y = "DataFrame"),
+ function(x, y) {
+ unioned <- callJMethod(x@sdf, "unionAll", y@sdf)
+ dataFrame(unioned)
+ })
+
+#' Intersect
+#'
+#' Return a new DataFrame containing rows only in both this DataFrame
+#' and another DataFrame. This is equivalent to `INTERSECT` in SQL.
+#'
+#' @param x A Spark DataFrame
+#' @param y A Spark DataFrame
+#' @return A DataFrame containing the result of the intersect.
+#' @rdname intersect
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlCtx, path)
+#' df2 <- jsonFile(sqlCtx, path2)
+#' intersectDF <- intersect(df, df2)
+#' }
+setMethod("intersect",
+ signature(x = "DataFrame", y = "DataFrame"),
+ function(x, y) {
+ intersected <- callJMethod(x@sdf, "intersect", y@sdf)
+ dataFrame(intersected)
+ })
+
+#' Subtract
+#'
+#' Return a new DataFrame containing rows in this DataFrame
+#' but not in another DataFrame. This is equivalent to `EXCEPT` in SQL.
+#'
+#' @param x A Spark DataFrame
+#' @param y A Spark DataFrame
+#' @return A DataFrame containing the result of the subtract operation.
+#' @rdname subtract
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlCtx, path)
+#' df2 <- jsonFile(sqlCtx, path2)
+#' subtractDF <- subtract(df, df2)
+#' }
+setMethod("subtract",
+ signature(x = "DataFrame", y = "DataFrame"),
+ function(x, y) {
+ subtracted <- callJMethod(x@sdf, "except", y@sdf)
+ dataFrame(subtracted)
+ })
+
+#' Save the contents of the DataFrame to a data source
+#'
+#' The data source is specified by the `source` and a set of options (...).
+#' If `source` is not specified, the default data source configured by
+#' spark.sql.sources.default will be used.
+#'
+#' Additionally, mode is used to specify the behavior of the save operation when
+#' data already exists in the data source. There are four modes:
+#' append: Contents of this DataFrame are expected to be appended to existing data.
+#' overwrite: Existing data is expected to be overwritten by the contents of
+# this DataFrame.
+#' error: An exception is expected to be thrown.
+#' ignore: The save operation is expected to not save the contents of the DataFrame
+# and to not change the existing data.
+#'
+#' @param df A SparkSQL DataFrame
+#' @param path A name for the table
+#' @param source A name for external data source
+#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
+#'
+#' @rdname saveAsTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' saveAsTable(df, "myfile")
+#' }
+setMethod("saveDF",
+ signature(df = "DataFrame", path = 'character', source = 'character',
+ mode = 'character'),
+ function(df, path = NULL, source = NULL, mode = "append", ...){
+ if (is.null(source)) {
+ sqlCtx <- get(".sparkRSQLsc", envir = .sparkREnv)
+ source <- callJMethod(sqlCtx, "getConf", "spark.sql.sources.default",
+ "org.apache.spark.sql.parquet")
+ }
+ allModes <- c("append", "overwrite", "error", "ignore")
+ if (!(mode %in% allModes)) {
+ stop('mode should be one of "append", "overwrite", "error", "ignore"')
+ }
+ jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
+ options <- varargsToEnv(...)
+ if (!is.null(path)) {
+ options[['path']] = path
+ }
+ callJMethod(df@sdf, "save", source, jmode, options)
+ })
+
+
+#' saveAsTable
+#'
+#' Save the contents of the DataFrame to a data source as a table
+#'
+#' The data source is specified by the `source` and a set of options (...).
+#' If `source` is not specified, the default data source configured by
+#' spark.sql.sources.default will be used.
+#'
+#' Additionally, mode is used to specify the behavior of the save operation when
+#' data already exists in the data source. There are four modes:
+#' append: Contents of this DataFrame are expected to be appended to existing data.
+#' overwrite: Existing data is expected to be overwritten by the contents of
+# this DataFrame.
+#' error: An exception is expected to be thrown.
+#' ignore: The save operation is expected to not save the contents of the DataFrame
+# and to not change the existing data.
+#'
+#' @param df A SparkSQL DataFrame
+#' @param tableName A name for the table
+#' @param source A name for external data source
+#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
+#'
+#' @rdname saveAsTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' saveAsTable(df, "myfile")
+#' }
+setMethod("saveAsTable",
+ signature(df = "DataFrame", tableName = 'character', source = 'character',
+ mode = 'character'),
+ function(df, tableName, source = NULL, mode="append", ...){
+ if (is.null(source)) {
+ sqlCtx <- get(".sparkRSQLsc", envir = .sparkREnv)
+ source <- callJMethod(sqlCtx, "getConf", "spark.sql.sources.default",
+ "org.apache.spark.sql.parquet")
+ }
+ allModes <- c("append", "overwrite", "error", "ignore")
+ if (!(mode %in% allModes)) {
+ stop('mode should be one of "append", "overwrite", "error", "ignore"')
+ }
+ jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
+ options <- varargsToEnv(...)
+ callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
+ })
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org