You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/09/10 17:24:57 UTC
[7/9] spark git commit: [SPARKR][BACKPORT-2.1] backporting package
and test changes
http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_shuffle.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_shuffle.R b/R/pkg/inst/tests/testthat/test_shuffle.R
deleted file mode 100644
index d38efab..0000000
--- a/R/pkg/inst/tests/testthat/test_shuffle.R
+++ /dev/null
@@ -1,224 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("partitionBy, groupByKey, reduceByKey etc.")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-# Data
-intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
-intRdd <- parallelize(sc, intPairs, 2L)
-
-doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
-doubleRdd <- parallelize(sc, doublePairs, 2L)
-
-numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
- list(3L, 0))
-numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
-
-strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
- "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
-strListRDD <- parallelize(sc, strList, 4)
-
-test_that("groupByKey for integers", {
- grouped <- groupByKey(intRdd, 2L)
-
- actual <- collectRDD(grouped)
-
- expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("groupByKey for doubles", {
- grouped <- groupByKey(doubleRdd, 2L)
-
- actual <- collectRDD(grouped)
-
- expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("reduceByKey for ints", {
- reduced <- reduceByKey(intRdd, "+", 2L)
-
- actual <- collectRDD(reduced)
-
- expected <- list(list(2L, 101), list(1L, 199))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("reduceByKey for doubles", {
- reduced <- reduceByKey(doubleRdd, "+", 2L)
- actual <- collectRDD(reduced)
-
- expected <- list(list(1.5, 199), list(2.5, 101))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("combineByKey for ints", {
- reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
-
- actual <- collectRDD(reduced)
-
- expected <- list(list(2L, 101), list(1L, 199))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("combineByKey for doubles", {
- reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
- actual <- collectRDD(reduced)
-
- expected <- list(list(1.5, 199), list(2.5, 101))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("combineByKey for characters", {
- stringKeyRDD <- parallelize(sc,
- list(list("max", 1L), list("min", 2L),
- list("other", 3L), list("max", 4L)), 2L)
- reduced <- combineByKey(stringKeyRDD,
- function(x) { x }, "+", "+", 2L)
- actual <- collectRDD(reduced)
-
- expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("aggregateByKey", {
- # test aggregateByKey for int keys
- rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-
- zeroValue <- list(0, 0)
- seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
- combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
- aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
-
- actual <- collectRDD(aggregatedRDD)
-
- expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
- # test aggregateByKey for string keys
- rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
-
- zeroValue <- list(0, 0)
- seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
- combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
- aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
-
- actual <- collectRDD(aggregatedRDD)
-
- expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("foldByKey", {
- # test foldByKey for int keys
- folded <- foldByKey(intRdd, 0, "+", 2L)
-
- actual <- collectRDD(folded)
-
- expected <- list(list(2L, 101), list(1L, 199))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
- # test foldByKey for double keys
- folded <- foldByKey(doubleRdd, 0, "+", 2L)
-
- actual <- collectRDD(folded)
-
- expected <- list(list(1.5, 199), list(2.5, 101))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
- # test foldByKey for string keys
- stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
-
- stringKeyRDD <- parallelize(sc, stringKeyPairs)
- folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
-
- actual <- collectRDD(folded)
-
- expected <- list(list("b", 101), list("a", 199))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
- # test foldByKey for empty pair RDD
- rdd <- parallelize(sc, list())
- folded <- foldByKey(rdd, 0, "+", 2L)
- actual <- collectRDD(folded)
- expected <- list()
- expect_equal(actual, expected)
-
- # test foldByKey for RDD with only 1 pair
- rdd <- parallelize(sc, list(list(1, 1)))
- folded <- foldByKey(rdd, 0, "+", 2L)
- actual <- collectRDD(folded)
- expected <- list(list(1, 1))
- expect_equal(actual, expected)
-})
-
-test_that("partitionBy() partitions data correctly", {
- # Partition by magnitude
- partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
-
- resultRDD <- partitionByRDD(numPairsRdd, 2L, partitionByMagnitude)
-
- expected_first <- list(list(1, 100), list(2, 200)) # key less than 3
- expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key greater than or equal 3
- actual_first <- collectPartition(resultRDD, 0L)
- actual_second <- collectPartition(resultRDD, 1L)
-
- expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
- expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
-})
-
-test_that("partitionBy works with dependencies", {
- kOne <- 1
- partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
-
- # Partition by parity
- resultRDD <- partitionByRDD(numPairsRdd, numPartitions = 2L, partitionByParity)
-
- # keys even; 100 %% 2 == 0
- expected_first <- list(list(2, 200), list(4, -1))
- # keys odd; 3 %% 2 == 1
- expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
- actual_first <- collectPartition(resultRDD, 0L)
- actual_second <- collectPartition(resultRDD, 1L)
-
- expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
- expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
-})
-
-test_that("test partitionBy with string keys", {
- words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
- wordCount <- lapply(words, function(word) { list(word, 1L) })
-
- resultRDD <- partitionByRDD(wordCount, 2L)
- expected_first <- list(list("Dexter", 1), list("Dexter", 1))
- expected_second <- list(list("and", 1), list("and", 1))
-
- actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
- collectPartition(resultRDD, 0L))
- actual_second <- Filter(function(item) { item[[1]] == "and" },
- collectPartition(resultRDD, 1L))
-
- expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
- expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
-})
-
-sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkR.R b/R/pkg/inst/tests/testthat/test_sparkR.R
deleted file mode 100644
index f73fc6b..0000000
--- a/R/pkg/inst/tests/testthat/test_sparkR.R
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("functions in sparkR.R")
-
-test_that("sparkCheckInstall", {
- # "local, yarn-client, mesos-client" mode, SPARK_HOME was set correctly,
- # and the SparkR job was submitted by "spark-submit"
- sparkHome <- paste0(tempdir(), "/", "sparkHome")
- dir.create(sparkHome)
- master <- ""
- deployMode <- ""
- expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode)))
- unlink(sparkHome, recursive = TRUE)
-
- # "yarn-cluster, mesos-cluster" mode, SPARK_HOME was not set,
- # and the SparkR job was submitted by "spark-submit"
- sparkHome <- ""
- master <- ""
- deployMode <- ""
- expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode)))
-
- # "yarn-client, mesos-client" mode, SPARK_HOME was not set
- sparkHome <- ""
- master <- "yarn-client"
- deployMode <- ""
- expect_error(sparkCheckInstall(sparkHome, master, deployMode))
- sparkHome <- ""
- master <- ""
- deployMode <- "client"
- expect_error(sparkCheckInstall(sparkHome, master, deployMode))
-})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org