You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/16 12:32:58 UTC
[7/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7
support
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
deleted file mode 100644
index 7bebe06..0000000
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ /dev/null
@@ -1,1836 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.*;
-import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-
-import scala.Tuple2;
-import scala.Tuple3;
-import scala.Tuple4;
-import scala.collection.JavaConverters;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.base.Throwables;
-import com.google.common.io.Files;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaFutureAction;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.input.PortableDataStream;
-import org.apache.spark.partial.BoundedDouble;
-import org.apache.spark.partial.PartialResult;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.serializer.KryoSerializer;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.util.LongAccumulator;
-import org.apache.spark.util.StatCounter;
-
-// The test suite itself is Serializable so that anonymous Function implementations can be
-// serialized, as an alternative to converting these anonymous classes to static inner classes;
-// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
- private transient JavaSparkContext sc;
- private transient File tempDir;
-
- @Before
- public void setUp() {
- sc = new JavaSparkContext("local", "JavaAPISuite");
- tempDir = Files.createTempDir();
- tempDir.deleteOnExit();
- }
-
- @After
- public void tearDown() {
- sc.stop();
- sc = null;
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void sparkContextUnion() {
- // Union of non-specialized JavaRDDs
- List<String> strings = Arrays.asList("Hello", "World");
- JavaRDD<String> s1 = sc.parallelize(strings);
- JavaRDD<String> s2 = sc.parallelize(strings);
- // Varargs
- JavaRDD<String> sUnion = sc.union(s1, s2);
- assertEquals(4, sUnion.count());
- // List
- List<JavaRDD<String>> list = new ArrayList<>();
- list.add(s2);
- sUnion = sc.union(s1, list);
- assertEquals(4, sUnion.count());
-
- // Union of JavaDoubleRDDs
- List<Double> doubles = Arrays.asList(1.0, 2.0);
- JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
- JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
- JavaDoubleRDD dUnion = sc.union(d1, d2);
- assertEquals(4, dUnion.count());
-
- // Union of JavaPairRDDs
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
- pairs.add(new Tuple2<>(1, 2));
- pairs.add(new Tuple2<>(3, 4));
- JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
- assertEquals(4, pUnion.count());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void intersection() {
- List<Integer> ints1 = Arrays.asList(1, 10, 2, 3, 4, 5);
- List<Integer> ints2 = Arrays.asList(1, 6, 2, 3, 7, 8);
- JavaRDD<Integer> s1 = sc.parallelize(ints1);
- JavaRDD<Integer> s2 = sc.parallelize(ints2);
-
- JavaRDD<Integer> intersections = s1.intersection(s2);
- assertEquals(3, intersections.count());
-
- JavaRDD<Integer> empty = sc.emptyRDD();
- JavaRDD<Integer> emptyIntersection = empty.intersection(s2);
- assertEquals(0, emptyIntersection.count());
-
- List<Double> doubles = Arrays.asList(1.0, 2.0);
- JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
- JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
- JavaDoubleRDD dIntersection = d1.intersection(d2);
- assertEquals(2, dIntersection.count());
-
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
- pairs.add(new Tuple2<>(1, 2));
- pairs.add(new Tuple2<>(3, 4));
- JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> pIntersection = p1.intersection(p2);
- assertEquals(2, pIntersection.count());
- }
-
- @Test
- public void sample() {
- List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
- JavaRDD<Integer> rdd = sc.parallelize(ints);
- // the seeds here are "magic" to make this work out nicely
- JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 8);
- assertEquals(2, sample20.count());
- JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 2);
- assertEquals(2, sample20WithoutReplacement.count());
- }
-
- @Test
- public void randomSplit() {
- List<Integer> ints = new ArrayList<>(1000);
- for (int i = 0; i < 1000; i++) {
- ints.add(i);
- }
- JavaRDD<Integer> rdd = sc.parallelize(ints);
- JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 31);
- // the splits aren't perfect -- not enough data for them to be -- just check they're about right
- assertEquals(3, splits.length);
- long s0 = splits[0].count();
- long s1 = splits[1].count();
- long s2 = splits[2].count();
- assertTrue(s0 + " not within expected range", s0 > 150 && s0 < 250);
- assertTrue(s1 + " not within expected range", s1 > 250 && s0 < 350);
- assertTrue(s2 + " not within expected range", s2 > 430 && s2 < 570);
- }
-
- @Test
- public void sortByKey() {
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
- pairs.add(new Tuple2<>(0, 4));
- pairs.add(new Tuple2<>(3, 2));
- pairs.add(new Tuple2<>(-1, 1));
-
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-
- // Default comparator
- JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
- assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
- List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
- assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
- assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
-
- // Custom comparator
- sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false);
- assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
- sortedPairs = sortedRDD.collect();
- assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
- assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void repartitionAndSortWithinPartitions() {
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
- pairs.add(new Tuple2<>(0, 5));
- pairs.add(new Tuple2<>(3, 8));
- pairs.add(new Tuple2<>(2, 6));
- pairs.add(new Tuple2<>(0, 8));
- pairs.add(new Tuple2<>(3, 8));
- pairs.add(new Tuple2<>(1, 3));
-
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-
- Partitioner partitioner = new Partitioner() {
- @Override
- public int numPartitions() {
- return 2;
- }
- @Override
- public int getPartition(Object key) {
- return (Integer) key % 2;
- }
- };
-
- JavaPairRDD<Integer, Integer> repartitioned =
- rdd.repartitionAndSortWithinPartitions(partitioner);
- assertTrue(repartitioned.partitioner().isPresent());
- assertEquals(repartitioned.partitioner().get(), partitioner);
- List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect();
- assertEquals(partitions.get(0),
- Arrays.asList(new Tuple2<>(0, 5), new Tuple2<>(0, 8), new Tuple2<>(2, 6)));
- assertEquals(partitions.get(1),
- Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8)));
- }
-
- @Test
- public void emptyRDD() {
- JavaRDD<String> rdd = sc.emptyRDD();
- assertEquals("Empty RDD shouldn't have any values", 0, rdd.count());
- }
-
- @Test
- public void sortBy() {
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
- pairs.add(new Tuple2<>(0, 4));
- pairs.add(new Tuple2<>(3, 2));
- pairs.add(new Tuple2<>(-1, 1));
-
- JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
-
- // compare on first value
- JavaRDD<Tuple2<Integer, Integer>> sortedRDD =
- rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<Integer, Integer> t) {
- return t._1();
- }
- }, true, 2);
-
- assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
- List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
- assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
- assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
-
- // compare on second value
- sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<Integer, Integer> t) {
- return t._2();
- }
- }, true, 2);
- assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
- sortedPairs = sortedRDD.collect();
- assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
- assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
- }
-
- @Test
- public void foreach() {
- final LongAccumulator accum = sc.sc().longAccumulator();
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String s) {
- accum.add(1);
- }
- });
- assertEquals(2, accum.value().intValue());
- }
-
- @Test
- public void foreachPartition() {
- final LongAccumulator accum = sc.sc().longAccumulator();
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
- @Override
- public void call(Iterator<String> iter) {
- while (iter.hasNext()) {
- iter.next();
- accum.add(1);
- }
- }
- });
- assertEquals(2, accum.value().intValue());
- }
-
- @Test
- public void toLocalIterator() {
- List<Integer> correct = Arrays.asList(1, 2, 3, 4);
- JavaRDD<Integer> rdd = sc.parallelize(correct);
- List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
- assertEquals(correct, result);
- }
-
- @Test
- public void zipWithUniqueId() {
- List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
- JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
- JavaRDD<Long> indexes = zip.values();
- assertEquals(4, new HashSet<>(indexes.collect()).size());
- }
-
- @Test
- public void zipWithIndex() {
- List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
- JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
- JavaRDD<Long> indexes = zip.values();
- List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
- assertEquals(correctIndexes, indexes.collect());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void lookup() {
- JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Apples", "Fruit"),
- new Tuple2<>("Oranges", "Fruit"),
- new Tuple2<>("Oranges", "Citrus")
- ));
- assertEquals(2, categories.lookup("Oranges").size());
- assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
- }
-
- @Test
- public void groupBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
- @Override
- public Boolean call(Integer x) {
- return x % 2 == 0;
- }
- };
- JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
- assertEquals(2, oddsAndEvens.count());
- assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
-
- oddsAndEvens = rdd.groupBy(isOdd, 1);
- assertEquals(2, oddsAndEvens.count());
- assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
- }
-
- @Test
- public void groupByOnPairRDD() {
- // Regression test for SPARK-4459
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Tuple2<Integer, Integer>, Boolean> areOdd =
- new Function<Tuple2<Integer, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Integer, Integer> x) {
- return (x._1() % 2 == 0) && (x._2() % 2 == 0);
- }
- };
- JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
- JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
- assertEquals(2, oddsAndEvens.count());
- assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
-
- oddsAndEvens = pairRDD.groupBy(areOdd, 1);
- assertEquals(2, oddsAndEvens.count());
- assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void keyByOnPairRDD() {
- // Regression test for SPARK-4459
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Tuple2<Integer, Integer>, String> sumToString =
- new Function<Tuple2<Integer, Integer>, String>() {
- @Override
- public String call(Tuple2<Integer, Integer> x) {
- return String.valueOf(x._1() + x._2());
- }
- };
- JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
- JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
- assertEquals(7, keyed.count());
- assertEquals(1, (long) keyed.lookup("2").get(0)._1());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void cogroup() {
- JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Apples", "Fruit"),
- new Tuple2<>("Oranges", "Fruit"),
- new Tuple2<>("Oranges", "Citrus")
- ));
- JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Oranges", 2),
- new Tuple2<>("Apples", 3)
- ));
- JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped =
- categories.cogroup(prices);
- assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
- assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
-
- cogrouped.collect();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void cogroup3() {
- JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Apples", "Fruit"),
- new Tuple2<>("Oranges", "Fruit"),
- new Tuple2<>("Oranges", "Citrus")
- ));
- JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Oranges", 2),
- new Tuple2<>("Apples", 3)
- ));
- JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Oranges", 21),
- new Tuple2<>("Apples", 42)
- ));
-
- JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
- categories.cogroup(prices, quantities);
- assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
- assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
- assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
-
-
- cogrouped.collect();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void cogroup4() {
- JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Apples", "Fruit"),
- new Tuple2<>("Oranges", "Fruit"),
- new Tuple2<>("Oranges", "Citrus")
- ));
- JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Oranges", 2),
- new Tuple2<>("Apples", 3)
- ));
- JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Oranges", 21),
- new Tuple2<>("Apples", 42)
- ));
- JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("Oranges", "BR"),
- new Tuple2<>("Apples", "US")
- ));
-
- JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>,
- Iterable<String>>> cogrouped = categories.cogroup(prices, quantities, countries);
- assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
- assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
- assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
- assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));
-
- cogrouped.collect();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void leftOuterJoin() {
- JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(1, 2),
- new Tuple2<>(2, 1),
- new Tuple2<>(3, 1)
- ));
- JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>(1, 'x'),
- new Tuple2<>(2, 'y'),
- new Tuple2<>(2, 'z'),
- new Tuple2<>(4, 'w')
- ));
- List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
- rdd1.leftOuterJoin(rdd2).collect();
- assertEquals(5, joined.size());
- Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
- rdd1.leftOuterJoin(rdd2).filter(
- new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) {
- return !tup._2()._2().isPresent();
- }
- }).first();
- assertEquals(3, firstUnmatched._1().intValue());
- }
-
- @Test
- public void foldReduce() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- };
-
- int sum = rdd.fold(0, add);
- assertEquals(33, sum);
-
- sum = rdd.reduce(add);
- assertEquals(33, sum);
- }
-
- @Test
- public void treeReduce() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
- Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- };
- for (int depth = 1; depth <= 10; depth++) {
- int sum = rdd.treeReduce(add, depth);
- assertEquals(-5, sum);
- }
- }
-
- @Test
- public void treeAggregate() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
- Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- };
- for (int depth = 1; depth <= 10; depth++) {
- int sum = rdd.treeAggregate(0, add, add, depth);
- assertEquals(-5, sum);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void aggregateByKey() {
- JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs(
- Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(1, 1),
- new Tuple2<>(3, 2),
- new Tuple2<>(5, 1),
- new Tuple2<>(5, 3)), 2);
-
- Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
- new Function2<Set<Integer>, Integer, Set<Integer>>() {
- @Override
- public Set<Integer> call(Set<Integer> a, Integer b) {
- a.add(b);
- return a;
- }
- },
- new Function2<Set<Integer>, Set<Integer>, Set<Integer>>() {
- @Override
- public Set<Integer> call(Set<Integer> a, Set<Integer> b) {
- a.addAll(b);
- return a;
- }
- }).collectAsMap();
- assertEquals(3, sets.size());
- assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
- assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
- assertEquals(new HashSet<>(Arrays.asList(1, 3)), sets.get(5));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void foldByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<>(2, 1),
- new Tuple2<>(2, 1),
- new Tuple2<>(1, 1),
- new Tuple2<>(3, 2),
- new Tuple2<>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
- assertEquals(1, sums.lookup(1).get(0).intValue());
- assertEquals(2, sums.lookup(2).get(0).intValue());
- assertEquals(3, sums.lookup(3).get(0).intValue());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void reduceByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<>(2, 1),
- new Tuple2<>(2, 1),
- new Tuple2<>(1, 1),
- new Tuple2<>(3, 2),
- new Tuple2<>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
- assertEquals(1, counts.lookup(1).get(0).intValue());
- assertEquals(2, counts.lookup(2).get(0).intValue());
- assertEquals(3, counts.lookup(3).get(0).intValue());
-
- Map<Integer, Integer> localCounts = counts.collectAsMap();
- assertEquals(1, localCounts.get(1).intValue());
- assertEquals(2, localCounts.get(2).intValue());
- assertEquals(3, localCounts.get(3).intValue());
-
- localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
- assertEquals(1, localCounts.get(1).intValue());
- assertEquals(2, localCounts.get(2).intValue());
- assertEquals(3, localCounts.get(3).intValue());
- }
-
- @Test
- public void approximateResults() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Map<Integer, Long> countsByValue = rdd.countByValue();
- assertEquals(2, countsByValue.get(1).longValue());
- assertEquals(1, countsByValue.get(13).longValue());
-
- PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
- Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
- assertEquals(2.0, finalValue.get(1).mean(), 0.01);
- assertEquals(1.0, finalValue.get(13).mean(), 0.01);
- }
-
- @Test
- public void take() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- assertEquals(1, rdd.first().intValue());
- rdd.take(2);
- rdd.takeSample(false, 2, 42);
- }
-
- @Test
- public void isEmpty() {
- assertTrue(sc.emptyRDD().isEmpty());
- assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
- assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
- assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
- new Function<Integer,Boolean>() {
- @Override
- public Boolean call(Integer i) {
- return i < 0;
- }
- }).isEmpty());
- assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
- new Function<Integer, Boolean>() {
- @Override
- public Boolean call(Integer i) {
- return i > 1;
- }
- }).isEmpty());
- }
-
- @Test
- public void cartesian() {
- JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
- JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
- JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
- assertEquals(new Tuple2<>("Hello", 1.0), cartesian.first());
- }
-
- @Test
- public void javaDoubleRDD() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
- JavaDoubleRDD distinct = rdd.distinct();
- assertEquals(5, distinct.count());
- JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
- @Override
- public Boolean call(Double x) {
- return x > 2.0;
- }
- });
- assertEquals(3, filter.count());
- JavaDoubleRDD union = rdd.union(rdd);
- assertEquals(12, union.count());
- union = union.cache();
- assertEquals(12, union.count());
-
- assertEquals(20, rdd.sum(), 0.01);
- StatCounter stats = rdd.stats();
- assertEquals(20, stats.sum(), 0.01);
- assertEquals(20/6.0, rdd.mean(), 0.01);
- assertEquals(20/6.0, rdd.mean(), 0.01);
- assertEquals(6.22222, rdd.variance(), 0.01);
- assertEquals(rdd.variance(), rdd.popVariance(), 1e-14);
- assertEquals(7.46667, rdd.sampleVariance(), 0.01);
- assertEquals(2.49444, rdd.stdev(), 0.01);
- assertEquals(rdd.stdev(), rdd.popStdev(), 1e-14);
- assertEquals(2.73252, rdd.sampleStdev(), 0.01);
-
- rdd.first();
- rdd.take(5);
- }
-
- @Test
- public void javaDoubleRDDHistoGram() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- // Test using generated buckets
- Tuple2<double[], long[]> results = rdd.histogram(2);
- double[] expected_buckets = {1.0, 2.5, 4.0};
- long[] expected_counts = {2, 2};
- assertArrayEquals(expected_buckets, results._1(), 0.1);
- assertArrayEquals(expected_counts, results._2());
- // Test with provided buckets
- long[] histogram = rdd.histogram(expected_buckets);
- assertArrayEquals(expected_counts, histogram);
- // SPARK-5744
- assertArrayEquals(
- new long[] {0},
- sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
- }
-
- private static class DoubleComparator implements Comparator<Double>, Serializable {
- @Override
- public int compare(Double o1, Double o2) {
- return o1.compareTo(o2);
- }
- }
-
- @Test
- public void max() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- double max = rdd.max(new DoubleComparator());
- assertEquals(4.0, max, 0.001);
- }
-
- @Test
- public void min() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- double max = rdd.min(new DoubleComparator());
- assertEquals(1.0, max, 0.001);
- }
-
- @Test
- public void naturalMax() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- double max = rdd.max();
- assertEquals(4.0, max, 0.0);
- }
-
- @Test
- public void naturalMin() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- double max = rdd.min();
- assertEquals(1.0, max, 0.0);
- }
-
- @Test
- public void takeOrdered() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2, new DoubleComparator()));
- assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2));
- }
-
- @Test
- public void top() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- List<Integer> top2 = rdd.top(2);
- assertEquals(Arrays.asList(4, 3), top2);
- }
-
- private static class AddInts implements Function2<Integer, Integer, Integer> {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- }
-
- @Test
- public void reduce() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- int sum = rdd.reduce(new AddInts());
- assertEquals(10, sum);
- }
-
- @Test
- public void reduceOnJavaDoubleRDD() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- double sum = rdd.reduce(new Function2<Double, Double, Double>() {
- @Override
- public Double call(Double v1, Double v2) {
- return v1 + v2;
- }
- });
- assertEquals(10.0, sum, 0.001);
- }
-
- @Test
- public void fold() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- int sum = rdd.fold(0, new AddInts());
- assertEquals(10, sum);
- }
-
- @Test
- public void aggregate() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- int sum = rdd.aggregate(0, new AddInts(), new AddInts());
- assertEquals(10, sum);
- }
-
- @Test
- public void map() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
- @Override
- public double call(Integer x) {
- return x.doubleValue();
- }
- }).cache();
- doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer x) {
- return new Tuple2<>(x, x);
- }
- }).cache();
- pairs.collect();
- JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
- @Override
- public String call(Integer x) {
- return x.toString();
- }
- }).cache();
- strings.collect();
- }
-
- @Test
- public void flatMap() {
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
- "The quick brown fox jumps over the lazy dog."));
- JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- });
- assertEquals("Hello", words.first());
- assertEquals(11, words.count());
-
- JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
- new PairFlatMapFunction<String, String, String>() {
- @Override
- public Iterator<Tuple2<String, String>> call(String s) {
- List<Tuple2<String, String>> pairs = new LinkedList<>();
- for (String word : s.split(" ")) {
- pairs.add(new Tuple2<>(word, word));
- }
- return pairs.iterator();
- }
- }
- );
- assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first());
- assertEquals(11, pairsRDD.count());
-
- JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
- @Override
- public Iterator<Double> call(String s) {
- List<Double> lengths = new LinkedList<>();
- for (String word : s.split(" ")) {
- lengths.add((double) word.length());
- }
- return lengths.iterator();
- }
- });
- assertEquals(5.0, doubles.first(), 0.01);
- assertEquals(11, pairsRDD.count());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void mapsFromPairsToPairs() {
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-
- // Regression test for SPARK-668:
- JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
- new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
- return Collections.singletonList(item.swap()).iterator();
- }
- });
- swapped.collect();
-
- // There was never a bug here, but it's worth testing:
- pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
- return item.swap();
- }
- }).collect();
- }
-
- @Test
- public void mapPartitions() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- JavaRDD<Integer> partitionSums = rdd.mapPartitions(
- new FlatMapFunction<Iterator<Integer>, Integer>() {
- @Override
- public Iterator<Integer> call(Iterator<Integer> iter) {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum).iterator();
- }
- });
- assertEquals("[3, 7]", partitionSums.collect().toString());
- }
-
-
- @Test
- public void mapPartitionsWithIndex() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex(
- new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
- @Override
- public Iterator<Integer> call(Integer index, Iterator<Integer> iter) {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum).iterator();
- }
- }, false);
- assertEquals("[3, 7]", partitionSums.collect().toString());
- }
-
- @Test
- public void getNumPartitions(){
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
- JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
- JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("a", 1),
- new Tuple2<>("aa", 2),
- new Tuple2<>("aaa", 3)
- ), 2);
- assertEquals(3, rdd1.getNumPartitions());
- assertEquals(2, rdd2.getNumPartitions());
- assertEquals(2, rdd3.getNumPartitions());
- }
-
- @Test
- public void repartition() {
- // Shrinking number of partitions
- JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
- JavaRDD<Integer> repartitioned1 = in1.repartition(4);
- List<List<Integer>> result1 = repartitioned1.glom().collect();
- assertEquals(4, result1.size());
- for (List<Integer> l : result1) {
- assertFalse(l.isEmpty());
- }
-
- // Growing number of partitions
- JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
- JavaRDD<Integer> repartitioned2 = in2.repartition(2);
- List<List<Integer>> result2 = repartitioned2.glom().collect();
- assertEquals(2, result2.size());
- for (List<Integer> l: result2) {
- assertFalse(l.isEmpty());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void persist() {
- JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
- doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
- assertEquals(20, doubleRDD.sum(), 0.1);
-
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
- pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
- assertEquals("a", pairRDD.first()._2());
-
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- rdd = rdd.persist(StorageLevel.DISK_ONLY());
- assertEquals(1, rdd.first().intValue());
- }
-
- @Test
- public void iterator() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- TaskContext context = TaskContext$.MODULE$.empty();
- assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
- }
-
- @Test
- public void glom() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- assertEquals("[1, 2]", rdd.glom().first().toString());
- }
-
- // File input / output tests are largely adapted from FileSuite:
-
- @Test
- public void textFiles() throws IOException {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- rdd.saveAsTextFile(outputDir);
- // Read the plain text file and check it's OK
- File outputFile = new File(outputDir, "part-00000");
- String content = Files.toString(outputFile, StandardCharsets.UTF_8);
- assertEquals("1\n2\n3\n4\n", content);
- // Also try reading it in as a text file RDD
- List<String> expected = Arrays.asList("1", "2", "3", "4");
- JavaRDD<String> readRDD = sc.textFile(outputDir);
- assertEquals(expected, readRDD.collect());
- }
-
- @Test
- public void wholeTextFiles() throws Exception {
- byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
- byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8);
-
- String tempDirName = tempDir.getAbsolutePath();
- String path1 = new Path(tempDirName, "part-00000").toUri().getPath();
- String path2 = new Path(tempDirName, "part-00001").toUri().getPath();
-
- Files.write(content1, new File(path1));
- Files.write(content2, new File(path2));
-
- Map<String, String> container = new HashMap<>();
- container.put(path1, new Text(content1).toString());
- container.put(path2, new Text(content2).toString());
-
- JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
- List<Tuple2<String, String>> result = readRDD.collect();
-
- for (Tuple2<String, String> res : result) {
- // Note that the paths from `wholeTextFiles` are in URI format on Windows,
- // for example, file:/C:/a/b/c.
- assertEquals(res._2(), container.get(new Path(res._1()).toUri().getPath()));
- }
- }
-
- @Test
- public void textFilesCompressed() throws IOException {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- rdd.saveAsTextFile(outputDir, DefaultCodec.class);
-
- // Try reading it in as a text file RDD
- List<String> expected = Arrays.asList("1", "2", "3", "4");
- JavaRDD<String> readRDD = sc.textFile(outputDir);
- assertEquals(expected, readRDD.collect());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void sequenceFile() {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- // Try reading the output back as an object file
- JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
- Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
- @Override
- public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
- return new Tuple2<>(pair._1().get(), pair._2().toString());
- }
- });
- assertEquals(pairs, readRDD.collect());
- }
-
- @Test
- public void binaryFiles() throws Exception {
- // Reusing the wholeText files example
- byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
-
- String tempDirName = tempDir.getAbsolutePath();
- File file1 = new File(tempDirName + "/part-00000");
-
- FileOutputStream fos1 = new FileOutputStream(file1);
-
- FileChannel channel1 = fos1.getChannel();
- ByteBuffer bbuf = ByteBuffer.wrap(content1);
- channel1.write(bbuf);
- channel1.close();
- JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3);
- List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
- for (Tuple2<String, PortableDataStream> res : result) {
- assertArrayEquals(content1, res._2().toArray());
- }
- }
-
- @Test
- public void binaryFilesCaching() throws Exception {
- // Reusing the wholeText files example
- byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
-
- String tempDirName = tempDir.getAbsolutePath();
- File file1 = new File(tempDirName + "/part-00000");
-
- FileOutputStream fos1 = new FileOutputStream(file1);
-
- FileChannel channel1 = fos1.getChannel();
- ByteBuffer bbuf = ByteBuffer.wrap(content1);
- channel1.write(bbuf);
- channel1.close();
-
- JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
- readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
- @Override
- public void call(Tuple2<String, PortableDataStream> pair) {
- pair._2().toArray(); // force the file to read
- }
- });
-
- List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
- for (Tuple2<String, PortableDataStream> res : result) {
- assertArrayEquals(content1, res._2().toArray());
- }
- }
-
- @Test
- public void binaryRecords() throws Exception {
- // Reusing the wholeText files example
- byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8);
- int numOfCopies = 10;
- String tempDirName = tempDir.getAbsolutePath();
- File file1 = new File(tempDirName + "/part-00000");
-
- FileOutputStream fos1 = new FileOutputStream(file1);
-
- FileChannel channel1 = fos1.getChannel();
-
- for (int i = 0; i < numOfCopies; i++) {
- ByteBuffer bbuf = ByteBuffer.wrap(content1);
- channel1.write(bbuf);
- }
- channel1.close();
-
- JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length);
- assertEquals(numOfCopies,readRDD.count());
- List<byte[]> result = readRDD.collect();
- for (byte[] res : result) {
- assertArrayEquals(content1, res);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void writeWithNewAPIHadoopFile() {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsNewAPIHadoopFile(
- outputDir, IntWritable.class, Text.class,
- org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
-
- JavaPairRDD<IntWritable, Text> output =
- sc.sequenceFile(outputDir, IntWritable.class, Text.class);
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void readWithNewAPIHadoopFile() throws IOException {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
- org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
- IntWritable.class, Text.class, Job.getInstance().getConfiguration());
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @Test
- public void objectFilesOfInts() {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- rdd.saveAsObjectFile(outputDir);
- // Try reading the output back as an object file
- List<Integer> expected = Arrays.asList(1, 2, 3, 4);
- JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
- assertEquals(expected, readRDD.collect());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void objectFilesOfComplexTypes() {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.saveAsObjectFile(outputDir);
- // Try reading the output back as an object file
- JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
- assertEquals(pairs, readRDD.collect());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void hadoopFile() {
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void hadoopFileCompressed() {
- String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
- DefaultCodec.class);
-
- JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @Test
- public void zip() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
- @Override
- public double call(Integer x) {
- return x.doubleValue();
- }
- });
- JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
- zipped.count();
- }
-
- @Test
- public void zipPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
- JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
- FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
- new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
- @Override
- public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
- return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
- }
- };
-
- JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
- assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void accumulators() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-
- final Accumulator<Integer> intAccum = sc.intAccumulator(10);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer x) {
- intAccum.add(x);
- }
- });
- assertEquals((Integer) 25, intAccum.value());
-
- final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer x) {
- doubleAccum.add((double) x);
- }
- });
- assertEquals((Double) 25.0, doubleAccum.value());
-
- // Try a custom accumulator type
- AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
- @Override
- public Float addInPlace(Float r, Float t) {
- return r + t;
- }
-
- @Override
- public Float addAccumulator(Float r, Float t) {
- return r + t;
- }
-
- @Override
- public Float zero(Float initialValue) {
- return 0.0f;
- }
- };
-
- final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer x) {
- floatAccum.add((float) x);
- }
- });
- assertEquals((Float) 25.0f, floatAccum.value());
-
- // Test the setValue method
- floatAccum.setValue(5.0f);
- assertEquals((Float) 5.0f, floatAccum.value());
- }
-
- @Test
- public void keyBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
- List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
- @Override
- public String call(Integer t) {
- return t.toString();
- }
- }).collect();
- assertEquals(new Tuple2<>("1", 1), s.get(0));
- assertEquals(new Tuple2<>("2", 2), s.get(1));
- }
-
- @Test
- public void checkpointAndComputation() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath());
- assertFalse(rdd.isCheckpointed());
- rdd.checkpoint();
- rdd.count(); // Forces the DAG to cause a checkpoint
- assertTrue(rdd.isCheckpointed());
- assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
- }
-
- @Test
- public void checkpointAndRestore() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath());
- assertFalse(rdd.isCheckpointed());
- rdd.checkpoint();
- rdd.count(); // Forces the DAG to cause a checkpoint
- assertTrue(rdd.isCheckpointed());
-
- assertTrue(rdd.getCheckpointFile().isPresent());
- JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
- assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
- }
-
- @Test
- public void combineByKey() {
- JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
- Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer v1) {
- return v1 % 3;
- }
- };
- Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer v1) {
- return v1;
- }
- };
-
- Function2<Integer, Integer, Integer> mergeValueFunction =
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) {
- return v1 + v2;
- }
- };
-
- JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
- .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
- Map<Integer, Integer> results = combinedRDD.collectAsMap();
- ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
- assertEquals(expected, results);
-
- Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
- combinedRDD.rdd(),
- JavaConverters.collectionAsScalaIterableConverter(
- Collections.<RDD<?>>emptyList()).asScala().toSeq());
- combinedRDD = originalRDD.keyBy(keyFunction)
- .combineByKey(
- createCombinerFunction,
- mergeValueFunction,
- mergeValueFunction,
- defaultPartitioner,
- false,
- new KryoSerializer(new SparkConf()));
- results = combinedRDD.collectAsMap();
- assertEquals(expected, results);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void mapOnPairRDD() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i, i % 2);
- }
- });
- JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
- new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
- return new Tuple2<>(in._2(), in._1());
- }
- });
- assertEquals(Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(0, 2),
- new Tuple2<>(1, 3),
- new Tuple2<>(0, 4)), rdd3.collect());
-
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void collectPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
-
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i, i % 2);
- }
- });
-
- List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
- assertEquals(Arrays.asList(1, 2), parts[0]);
-
- parts = rdd1.collectPartitions(new int[] {1, 2});
- assertEquals(Arrays.asList(3, 4), parts[0]);
- assertEquals(Arrays.asList(5, 6, 7), parts[1]);
-
- assertEquals(Arrays.asList(new Tuple2<>(1, 1),
- new Tuple2<>(2, 0)),
- rdd2.collectPartitions(new int[] {0})[0]);
-
- List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
- assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
- assertEquals(Arrays.asList(new Tuple2<>(5, 1),
- new Tuple2<>(6, 0),
- new Tuple2<>(7, 1)),
- parts2[1]);
- }
-
- @Test
- public void countApproxDistinct() {
- List<Integer> arrayData = new ArrayList<>();
- int size = 100;
- for (int i = 0; i < 100000; i++) {
- arrayData.add(i % size);
- }
- JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
- assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
- }
-
- @Test
- public void countApproxDistinctByKey() {
- List<Tuple2<Integer, Integer>> arrayData = new ArrayList<>();
- for (int i = 10; i < 100; i++) {
- for (int j = 0; j < i; j++) {
- arrayData.add(new Tuple2<>(i, j));
- }
- }
- double relativeSD = 0.001;
- JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
- List<Tuple2<Integer, Long>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
- for (Tuple2<Integer, Long> resItem : res) {
- double count = resItem._1();
- long resCount = resItem._2();
- double error = Math.abs((resCount - count) / count);
- assertTrue(error < 0.1);
- }
-
- }
-
- @Test
- public void collectAsMapWithIntArrayValues() {
- // Regression test for SPARK-1040
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
- JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(
- new PairFunction<Integer, Integer, int[]>() {
- @Override
- public Tuple2<Integer, int[]> call(Integer x) {
- return new Tuple2<>(x, new int[]{x});
- }
- });
- pairRDD.collect(); // Works fine
- pairRDD.collectAsMap(); // Used to crash with ClassCastException
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void collectAsMapAndSerialize() throws Exception {
- JavaPairRDD<String,Integer> rdd =
- sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
- Map<String,Integer> map = rdd.collectAsMap();
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- new ObjectOutputStream(bytes).writeObject(map);
- Map<String,Integer> deserializedMap = (Map<String,Integer>)
- new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
- assertEquals(1, deserializedMap.get("foo").intValue());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void sampleByKey() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i % 2, 1);
- }
- });
- Map<Integer, Double> fractions = new HashMap<>();
- fractions.put(0, 0.5);
- fractions.put(1, 1.0);
- JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
- Map<Integer, Long> wrCounts = wr.countByKey();
- assertEquals(2, wrCounts.size());
- assertTrue(wrCounts.get(0) > 0);
- assertTrue(wrCounts.get(1) > 0);
- JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
- Map<Integer, Long> worCounts = wor.countByKey();
- assertEquals(2, worCounts.size());
- assertTrue(worCounts.get(0) > 0);
- assertTrue(worCounts.get(1) > 0);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void sampleByKeyExact() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i % 2, 1);
- }
- });
- Map<Integer, Double> fractions = new HashMap<>();
- fractions.put(0, 0.5);
- fractions.put(1, 1.0);
- JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
- Map<Integer, Long> wrExactCounts = wrExact.countByKey();
- assertEquals(2, wrExactCounts.size());
- assertTrue(wrExactCounts.get(0) == 2);
- assertTrue(wrExactCounts.get(1) == 4);
- JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
- Map<Integer, Long> worExactCounts = worExact.countByKey();
- assertEquals(2, worExactCounts.size());
- assertTrue(worExactCounts.get(0) == 2);
- assertTrue(worExactCounts.get(1) == 4);
- }
-
- private static class SomeCustomClass implements Serializable {
- SomeCustomClass() {
- // Intentionally left blank
- }
- }
-
- @Test
- public void collectUnderlyingScalaRDD() {
- List<SomeCustomClass> data = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- data.add(new SomeCustomClass());
- }
- JavaRDD<SomeCustomClass> rdd = sc.parallelize(data);
- SomeCustomClass[] collected =
- (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
- assertEquals(data.size(), collected.length);
- }
-
- private static final class BuggyMapFunction<T> implements Function<T, T> {
-
- @Override
- public T call(T x) {
- throw new IllegalStateException("Custom exception!");
- }
- }
-
- @Test
- public void collectAsync() throws Exception {
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
- JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<List<Integer>> future = rdd.collectAsync();
- List<Integer> result = future.get();
- assertEquals(data, result);
- assertFalse(future.isCancelled());
- assertTrue(future.isDone());
- assertEquals(1, future.jobIds().size());
- }
-
- @Test
- public void takeAsync() throws Exception {
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
- JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<List<Integer>> future = rdd.takeAsync(1);
- List<Integer> result = future.get();
- assertEquals(1, result.size());
- assertEquals((Integer) 1, result.get(0));
- assertFalse(future.isCancelled());
- assertTrue(future.isDone());
- assertEquals(1, future.jobIds().size());
- }
-
- @Test
- public void foreachAsync() throws Exception {
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
- JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<Void> future = rdd.foreachAsync(
- new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) {
- // intentionally left blank.
- }
- }
- );
- future.get();
- assertFalse(future.isCancelled());
- assertTrue(future.isDone());
- assertEquals(1, future.jobIds().size());
- }
-
- @Test
- public void countAsync() throws Exception {
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
- JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<Long> future = rdd.countAsync();
- long count = future.get();
- assertEquals(data.size(), count);
- assertFalse(future.isCancelled());
- assertTrue(future.isDone());
- assertEquals(1, future.jobIds().size());
- }
-
- @Test
- public void testAsyncActionCancellation() throws Exception {
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
- JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) throws InterruptedException {
- Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled.
- }
- });
- future.cancel(true);
- assertTrue(future.isCancelled());
- assertTrue(future.isDone());
- try {
- future.get(2000, TimeUnit.MILLISECONDS);
- fail("Expected future.get() for cancelled job to throw CancellationException");
- } catch (CancellationException ignored) {
- // pass
- }
- }
-
- @Test
- public void testAsyncActionErrorWrapping() throws Exception {
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
- JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync();
- try {
- future.get(2, TimeUnit.SECONDS);
- fail("Expected future.get() for failed job to throw ExcecutionException");
- } catch (ExecutionException ee) {
- assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
- }
- assertTrue(future.isDone());
- }
-
- static class Class1 {}
- static class Class2 {}
-
- @Test
- public void testRegisterKryoClasses() {
- SparkConf conf = new SparkConf();
- conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class });
- assertEquals(
- Class1.class.getName() + "," + Class2.class.getName(),
- conf.get("spark.kryo.classesToRegister"));
- }
-
- @Test
- public void testGetPersistentRDDs() {
- java.util.Map<Integer, JavaRDD<?>> cachedRddsMap = sc.getPersistentRDDs();
- assertTrue(cachedRddsMap.isEmpty());
- JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b")).setName("RDD1").cache();
- JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("c", "d")).setName("RDD2").cache();
- cachedRddsMap = sc.getPersistentRDDs();
- assertEquals(2, cachedRddsMap.size());
- assertEquals("RDD1", cachedRddsMap.get(0).name());
- assertEquals("RDD2", cachedRddsMap.get(1).name());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
new file mode 100644
index 0000000..e22ad89
--- /dev/null
+++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.util.Utils;
+
+/**
+ * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8RDDAPISuite implements Serializable {
+ private static int foreachCalls = 0;
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaAPISuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @Test
+ public void foreachWithAnonymousClass() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String s) {
+ foreachCalls++;
+ }
+ });
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void foreach() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach(x -> foreachCalls++);
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void groupBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
+ JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+
+ oddsAndEvens = rdd.groupBy(isOdd, 1);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+ }
+
+ @Test
+ public void leftOuterJoin() {
+ JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<>(1, 1),
+ new Tuple2<>(1, 2),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(3, 1)
+ ));
+ JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<>(1, 'x'),
+ new Tuple2<>(2, 'y'),
+ new Tuple2<>(2, 'z'),
+ new Tuple2<>(4, 'w')
+ ));
+ List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
+ @Test
+ public void foldReduce() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+
+ int sum = rdd.fold(0, add);
+ Assert.assertEquals(33, sum);
+
+ sum = rdd.reduce(add);
+ Assert.assertEquals(33, sum);
+ }
+
+ @Test
+ public void foldByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
+ Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+ }
+
+ @Test
+ public void reduceByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
+ Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+ Map<Integer, Integer> localCounts = counts.collectAsMap();
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+
+ localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+ }
+
+ @Test
+ public void map() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
+ doubles.collect();
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
+ .cache();
+ pairs.collect();
+ JavaRDD<String> strings = rdd.map(Object::toString).cache();
+ strings.collect();
+ }
+
+ @Test
+ public void flatMap() {
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+ "The quick brown fox jumps over the lazy dog."));
+ JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
+
+ Assert.assertEquals("Hello", words.first());
+ Assert.assertEquals(11, words.count());
+
+ JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
+ List<Tuple2<String, String>> pairs2 = new LinkedList<>();
+ for (String word : s.split(" ")) {
+ pairs2.add(new Tuple2<>(word, word));
+ }
+ return pairs2.iterator();
+ });
+
+ Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
+ Assert.assertEquals(11, pairs.count());
+
+ JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+ List<Double> lengths = new LinkedList<>();
+ for (String word : s.split(" ")) {
+ lengths.add((double) word.length());
+ }
+ return lengths.iterator();
+ });
+
+ Assert.assertEquals(5.0, doubles.first(), 0.01);
+ Assert.assertEquals(11, pairs.count());
+ }
+
+ @Test
+ public void mapsFromPairsToPairs() {
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+ // Regression test for SPARK-668:
+ JavaPairRDD<String, Integer> swapped =
+ pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator());
+ swapped.collect();
+
+ // There was never a bug here, but it's worth testing:
+ pairRDD.map(Tuple2::swap).collect();
+ }
+
+ @Test
+ public void mapPartitions() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+ JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
+ }
+ return Collections.singletonList(sum).iterator();
+ });
+
+ Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+ }
+
+ @Test
+ public void sequenceFile() {
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+ // Try reading the output back as an object file
+ JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
+ .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
+ Assert.assertEquals(pairs, readRDD.collect());
+ Utils.deleteRecursively(tempDir);
+ }
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
+
+ @Test
+ public void zipPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+ JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+ FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+ (Iterator<Integer> i, Iterator<String> s) -> {
+ int sizeI = 0;
+ while (i.hasNext()) {
+ sizeI += 1;
+ i.next();
+ }
+ int sizeS = 0;
+ while (s.hasNext()) {
+ sizeS += 1;
+ s.next();
+ }
+ return Arrays.asList(sizeI, sizeS).iterator();
+ };
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+ Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+ }
+
+ @Test
+ public void keyBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+ List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
+ Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
+ }
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
+ JavaPairRDD<Integer, Integer> rdd3 =
+ rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<>(1, 1),
+ new Tuple2<>(0, 2),
+ new Tuple2<>(1, 3),
+ new Tuple2<>(0, 4)), rdd3.collect());
+ }
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
+ List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
+ rdd2.collectPartitions(new int[]{0})[0]);
+
+ List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
+ parts2[1]);
+ }
+
+ @Test
+ public void collectAsMapWithIntArrayValues() {
+ // Regression test for SPARK-1040
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
+ JavaPairRDD<Integer, int[]> pairRDD =
+ rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
+ pairRDD.collect(); // Works fine
+ pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org