You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/02/23 02:54:06 UTC
[1/2] Migrate Java code to Scala or move it to src/main/java
Repository: incubator-spark
Updated Branches:
refs/heads/master 1aa4f8af7 -> 29ac7ea52
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
deleted file mode 100644
index 20232e9..0000000
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ /dev/null
@@ -1,981 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.partial.BoundedDouble;
-import org.apache.spark.partial.PartialResult;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.util.StatCounter;
-
-// The test suite itself is Serializable so that anonymous Function implementations can be
-// serialized, as an alternative to converting these anonymous classes to static inner classes;
-// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
- private transient JavaSparkContext sc;
-
- @Before
- public void setUp() {
- sc = new JavaSparkContext("local", "JavaAPISuite");
- }
-
- @After
- public void tearDown() {
- sc.stop();
- sc = null;
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port");
- }
-
- static class ReverseIntComparator implements Comparator<Integer>, Serializable {
-
- @Override
- public int compare(Integer a, Integer b) {
- if (a > b) return -1;
- else if (a < b) return 1;
- else return 0;
- }
- };
-
- @Test
- public void sparkContextUnion() {
- // Union of non-specialized JavaRDDs
- List<String> strings = Arrays.asList("Hello", "World");
- JavaRDD<String> s1 = sc.parallelize(strings);
- JavaRDD<String> s2 = sc.parallelize(strings);
- // Varargs
- JavaRDD<String> sUnion = sc.union(s1, s2);
- Assert.assertEquals(4, sUnion.count());
- // List
- List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>();
- list.add(s2);
- sUnion = sc.union(s1, list);
- Assert.assertEquals(4, sUnion.count());
-
- // Union of JavaDoubleRDDs
- List<Double> doubles = Arrays.asList(1.0, 2.0);
- JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
- JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
- JavaDoubleRDD dUnion = sc.union(d1, d2);
- Assert.assertEquals(4, dUnion.count());
-
- // Union of JavaPairRDDs
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
- pairs.add(new Tuple2<Integer, Integer>(1, 2));
- pairs.add(new Tuple2<Integer, Integer>(3, 4));
- JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
- Assert.assertEquals(4, pUnion.count());
- }
-
- @Test
- public void sortByKey() {
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
- pairs.add(new Tuple2<Integer, Integer>(0, 4));
- pairs.add(new Tuple2<Integer, Integer>(3, 2));
- pairs.add(new Tuple2<Integer, Integer>(-1, 1));
-
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-
- // Default comparator
- JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
- Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
- List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
- Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
- Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
-
- // Custom comparator
- sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false);
- Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
- sortedPairs = sortedRDD.collect();
- Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
- Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
- }
-
- static int foreachCalls = 0;
-
- @Test
- public void foreach() {
- foreachCalls = 0;
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String s) {
- foreachCalls++;
- }
- });
- Assert.assertEquals(2, foreachCalls);
- }
-
- @Test
- public void lookup() {
- JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, String>("Apples", "Fruit"),
- new Tuple2<String, String>("Oranges", "Fruit"),
- new Tuple2<String, String>("Oranges", "Citrus")
- ));
- Assert.assertEquals(2, categories.lookup("Oranges").size());
- Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
- }
-
- @Test
- public void groupBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
- @Override
- public Boolean call(Integer x) {
- return x % 2 == 0;
- }
- };
- JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
- Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
-
- oddsAndEvens = rdd.groupBy(isOdd, 1);
- Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
- }
-
- @Test
- public void cogroup() {
- JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, String>("Apples", "Fruit"),
- new Tuple2<String, String>("Oranges", "Fruit"),
- new Tuple2<String, String>("Oranges", "Citrus")
- ));
- JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, Integer>("Oranges", 2),
- new Tuple2<String, Integer>("Apples", 3)
- ));
- JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
- Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
- Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
-
- cogrouped.collect();
- }
-
- @Test
- public void leftOuterJoin() {
- JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(1, 2),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(3, 1)
- ));
- JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<Integer, Character>(1, 'x'),
- new Tuple2<Integer, Character>(2, 'y'),
- new Tuple2<Integer, Character>(2, 'z'),
- new Tuple2<Integer, Character>(4, 'w')
- ));
- List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
- rdd1.leftOuterJoin(rdd2).collect();
- Assert.assertEquals(5, joined.size());
- Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
- rdd1.leftOuterJoin(rdd2).filter(
- new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
- throws Exception {
- return !tup._2()._2().isPresent();
- }
- }).first();
- Assert.assertEquals(3, firstUnmatched._1().intValue());
- }
-
- @Test
- public void foldReduce() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- };
-
- int sum = rdd.fold(0, add);
- Assert.assertEquals(33, sum);
-
- sum = rdd.reduce(add);
- Assert.assertEquals(33, sum);
- }
-
- @Test
- public void foldByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(3, 2),
- new Tuple2<Integer, Integer>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
- Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
- Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
- Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
- }
-
- @Test
- public void reduceByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(3, 2),
- new Tuple2<Integer, Integer>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
- Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
- Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
- Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
-
- Map<Integer, Integer> localCounts = counts.collectAsMap();
- Assert.assertEquals(1, localCounts.get(1).intValue());
- Assert.assertEquals(2, localCounts.get(2).intValue());
- Assert.assertEquals(3, localCounts.get(3).intValue());
-
- localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
- Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
- Assert.assertEquals(1, localCounts.get(1).intValue());
- Assert.assertEquals(2, localCounts.get(2).intValue());
- Assert.assertEquals(3, localCounts.get(3).intValue());
- }
-
- @Test
- public void approximateResults() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Map<Integer, Long> countsByValue = rdd.countByValue();
- Assert.assertEquals(2, countsByValue.get(1).longValue());
- Assert.assertEquals(1, countsByValue.get(13).longValue());
-
- PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
- Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
- Assert.assertEquals(2.0, finalValue.get(1).mean(), 0.01);
- Assert.assertEquals(1.0, finalValue.get(13).mean(), 0.01);
- }
-
- @Test
- public void take() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Assert.assertEquals(1, rdd.first().intValue());
- List<Integer> firstTwo = rdd.take(2);
- List<Integer> sample = rdd.takeSample(false, 2, 42);
- }
-
- @Test
- public void cartesian() {
- JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
- JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
- JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
- Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first());
- }
-
- @Test
- public void javaDoubleRDD() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
- JavaDoubleRDD distinct = rdd.distinct();
- Assert.assertEquals(5, distinct.count());
- JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
- @Override
- public Boolean call(Double x) {
- return x > 2.0;
- }
- });
- Assert.assertEquals(3, filter.count());
- JavaDoubleRDD union = rdd.union(rdd);
- Assert.assertEquals(12, union.count());
- union = union.cache();
- Assert.assertEquals(12, union.count());
-
- Assert.assertEquals(20, rdd.sum(), 0.01);
- StatCounter stats = rdd.stats();
- Assert.assertEquals(20, stats.sum(), 0.01);
- Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
- Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
- Assert.assertEquals(6.22222, rdd.variance(), 0.01);
- Assert.assertEquals(7.46667, rdd.sampleVariance(), 0.01);
- Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
- Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
-
- Double first = rdd.first();
- List<Double> take = rdd.take(5);
- }
-
- @Test
- public void javaDoubleRDDHistoGram() {
- JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- // Test using generated buckets
- Tuple2<double[], long[]> results = rdd.histogram(2);
- double[] expected_buckets = {1.0, 2.5, 4.0};
- long[] expected_counts = {2, 2};
- Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
- Assert.assertArrayEquals(expected_counts, results._2);
- // Test with provided buckets
- long[] histogram = rdd.histogram(expected_buckets);
- Assert.assertArrayEquals(expected_counts, histogram);
- }
-
- @Test
- public void map() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
- @Override
- public Double call(Integer x) {
- return 1.0 * x;
- }
- }).cache();
- doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer x) {
- return new Tuple2<Integer, Integer>(x, x);
- }
- }).cache();
- pairs.collect();
- JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
- @Override
- public String call(Integer x) {
- return x.toString();
- }
- }).cache();
- strings.collect();
- }
-
- @Test
- public void flatMap() {
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
- "The quick brown fox jumps over the lazy dog."));
- JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
- }
- });
- Assert.assertEquals("Hello", words.first());
- Assert.assertEquals(11, words.count());
-
- JavaPairRDD<String, String> pairs = rdd.flatMap(
- new PairFlatMapFunction<String, String, String>() {
-
- @Override
- public Iterable<Tuple2<String, String>> call(String s) {
- List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
- for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
- return pairs;
- }
- }
- );
- Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
- Assert.assertEquals(11, pairs.count());
-
- JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
- @Override
- public Iterable<Double> call(String s) {
- List<Double> lengths = new LinkedList<Double>();
- for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
- return lengths;
- }
- });
- Double x = doubles.first();
- Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
- Assert.assertEquals(11, pairs.count());
- }
-
- @Test
- public void mapsFromPairsToPairs() {
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-
- // Regression test for SPARK-668:
- JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
- new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
- return Collections.singletonList(item.swap());
- }
- });
- swapped.collect();
-
- // There was never a bug here, but it's worth testing:
- pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
- return item.swap();
- }
- }).collect();
- }
-
- @Test
- public void mapPartitions() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- JavaRDD<Integer> partitionSums = rdd.mapPartitions(
- new FlatMapFunction<Iterator<Integer>, Integer>() {
- @Override
- public Iterable<Integer> call(Iterator<Integer> iter) {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum);
- }
- });
- Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
- }
-
- @Test
- public void repartition() {
- // Shrinking number of partitions
- JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
- JavaRDD<Integer> repartitioned1 = in1.repartition(4);
- List<List<Integer>> result1 = repartitioned1.glom().collect();
- Assert.assertEquals(4, result1.size());
- for (List<Integer> l: result1) {
- Assert.assertTrue(l.size() > 0);
- }
-
- // Growing number of partitions
- JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
- JavaRDD<Integer> repartitioned2 = in2.repartition(2);
- List<List<Integer>> result2 = repartitioned2.glom().collect();
- Assert.assertEquals(2, result2.size());
- for (List<Integer> l: result2) {
- Assert.assertTrue(l.size() > 0);
- }
- }
-
- @Test
- public void persist() {
- JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
- doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
- Assert.assertEquals(20, doubleRDD.sum(), 0.1);
-
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
- pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
- Assert.assertEquals("a", pairRDD.first()._2());
-
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- rdd = rdd.persist(StorageLevel.DISK_ONLY());
- Assert.assertEquals(1, rdd.first().intValue());
- }
-
- @Test
- public void iterator() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- TaskContext context = new TaskContext(0, 0, 0, false, false, null);
- Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
- }
-
- @Test
- public void glom() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- Assert.assertEquals("[1, 2]", rdd.glom().first().toString());
- }
-
- // File input / output tests are largely adapted from FileSuite:
-
- @Test
- public void textFiles() throws IOException {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- rdd.saveAsTextFile(outputDir);
- // Read the plain text file and check it's OK
- File outputFile = new File(outputDir, "part-00000");
- String content = Files.toString(outputFile, Charsets.UTF_8);
- Assert.assertEquals("1\n2\n3\n4\n", content);
- // Also try reading it in as a text file RDD
- List<String> expected = Arrays.asList("1", "2", "3", "4");
- JavaRDD<String> readRDD = sc.textFile(outputDir);
- Assert.assertEquals(expected, readRDD.collect());
- }
-
- @Test
- public void textFilesCompressed() throws IOException {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- rdd.saveAsTextFile(outputDir, DefaultCodec.class);
-
- // Try reading it in as a text file RDD
- List<String> expected = Arrays.asList("1", "2", "3", "4");
- JavaRDD<String> readRDD = sc.textFile(outputDir);
- Assert.assertEquals(expected, readRDD.collect());
- }
-
- @Test
- public void sequenceFile() {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- // Try reading the output back as an object file
- JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
- Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
- @Override
- public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
- return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
- }
- });
- Assert.assertEquals(pairs, readRDD.collect());
- }
-
- @Test
- public void writeWithNewAPIHadoopFile() {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
- org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
-
- JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class,
- Text.class);
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @Test
- public void readWithNewAPIHadoopFile() throws IOException {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
- org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class,
- Text.class, new Job().getConfiguration());
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @Test
- public void objectFilesOfInts() {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- rdd.saveAsObjectFile(outputDir);
- // Try reading the output back as an object file
- List<Integer> expected = Arrays.asList(1, 2, 3, 4);
- JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
- Assert.assertEquals(expected, readRDD.collect());
- }
-
- @Test
- public void objectFilesOfComplexTypes() {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.saveAsObjectFile(outputDir);
- // Try reading the output back as an object file
- JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
- Assert.assertEquals(pairs, readRDD.collect());
- }
-
- @Test
- public void hadoopFile() {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @Test
- public void hadoopFileCompressed() {
- File tempDir = Files.createTempDir();
- String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
- DefaultCodec.class);
-
- JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
- }
-
- @Test
- public void zip() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
- @Override
- public Double call(Integer x) {
- return 1.0 * x;
- }
- });
- JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
- zipped.count();
- }
-
- @Test
- public void zipPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
- JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
- FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
- new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
- @Override
- public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
- int sizeI = 0;
- int sizeS = 0;
- while (i.hasNext()) {
- sizeI += 1;
- i.next();
- }
- while (s.hasNext()) {
- sizeS += 1;
- s.next();
- }
- return Arrays.asList(sizeI, sizeS);
- }
- };
-
- JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
- Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
- }
-
- @Test
- public void accumulators() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-
- final Accumulator<Integer> intAccum = sc.intAccumulator(10);
- rdd.foreach(new VoidFunction<Integer>() {
- public void call(Integer x) {
- intAccum.add(x);
- }
- });
- Assert.assertEquals((Integer) 25, intAccum.value());
-
- final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
- rdd.foreach(new VoidFunction<Integer>() {
- public void call(Integer x) {
- doubleAccum.add((double) x);
- }
- });
- Assert.assertEquals((Double) 25.0, doubleAccum.value());
-
- // Try a custom accumulator type
- AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
- public Float addInPlace(Float r, Float t) {
- return r + t;
- }
-
- public Float addAccumulator(Float r, Float t) {
- return r + t;
- }
-
- public Float zero(Float initialValue) {
- return 0.0f;
- }
- };
-
- final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
- rdd.foreach(new VoidFunction<Integer>() {
- public void call(Integer x) {
- floatAccum.add((float) x);
- }
- });
- Assert.assertEquals((Float) 25.0f, floatAccum.value());
-
- // Test the setValue method
- floatAccum.setValue(5.0f);
- Assert.assertEquals((Float) 5.0f, floatAccum.value());
- }
-
- @Test
- public void keyBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
- List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
- public String call(Integer t) throws Exception {
- return t.toString();
- }
- }).collect();
- Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
- Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
- }
-
- @Test
- public void checkpointAndComputation() {
- File tempDir = Files.createTempDir();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath());
- Assert.assertEquals(false, rdd.isCheckpointed());
- rdd.checkpoint();
- rdd.count(); // Forces the DAG to cause a checkpoint
- Assert.assertEquals(true, rdd.isCheckpointed());
- Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
- }
-
- @Test
- public void checkpointAndRestore() {
- File tempDir = Files.createTempDir();
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath());
- Assert.assertEquals(false, rdd.isCheckpointed());
- rdd.checkpoint();
- rdd.count(); // Forces the DAG to cause a checkpoint
- Assert.assertEquals(true, rdd.isCheckpointed());
-
- Assert.assertTrue(rdd.getCheckpointFile().isPresent());
- JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
- Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
- }
-
- @Test
- public void mapOnPairRDD() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
- return new Tuple2<Integer, Integer>(i, i % 2);
- }
- });
- JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
- new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
- return new Tuple2<Integer, Integer>(in._2(), in._1());
- }
- });
- Assert.assertEquals(Arrays.asList(
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(0, 2),
- new Tuple2<Integer, Integer>(1, 3),
- new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
-
- }
-
- @Test
- public void collectPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
-
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
- return new Tuple2<Integer, Integer>(i, i % 2);
- }
- });
-
- List[] parts = rdd1.collectPartitions(new int[] {0});
- Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
-
- parts = rdd1.collectPartitions(new int[] {1, 2});
- Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
- Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
-
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(2, 0)),
- rdd2.collectPartitions(new int[] {0})[0]);
-
- parts = rdd2.collectPartitions(new int[] {1, 2});
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
- new Tuple2<Integer, Integer>(4, 0)),
- parts[0]);
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
- new Tuple2<Integer, Integer>(6, 0),
- new Tuple2<Integer, Integer>(7, 1)),
- parts[1]);
- }
-
- @Test
- public void countApproxDistinct() {
- List<Integer> arrayData = new ArrayList<Integer>();
- int size = 100;
- for (int i = 0; i < 100000; i++) {
- arrayData.add(i % size);
- }
- JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
- Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
- Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
- Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
- }
-
- @Test
- public void countApproxDistinctByKey() {
- double relativeSD = 0.001;
-
- List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
- for (int i = 10; i < 100; i++)
- for (int j = 0; j < i; j++)
- arrayData.add(new Tuple2<Integer, Integer>(i, j));
-
- JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
- List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
- for (Tuple2<Integer, Object> resItem : res) {
- double count = (double)resItem._1();
- Long resCount = (Long)resItem._2();
- Double error = Math.abs((resCount - count) / count);
- Assert.assertTrue(error < relativeSD);
- }
-
- }
-
- @Test
- public void collectAsMapWithIntArrayValues() {
- // Regression test for SPARK-1040
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
- JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
- @Override
- public Tuple2<Integer, int[]> call(Integer x) throws Exception {
- return new Tuple2<Integer, int[]>(x, new int[] { x });
- }
- });
- pairRDD.collect(); // Works fine
- Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
- }
-}
[2/2] git commit: Migrate Java code to Scala or move it to
src/main/java
Posted by ad...@apache.org.
Migrate Java code to Scala or move it to src/main/java
These classes can't be migrated:
StorageLevels: impossible to create static fields in Scala
JavaSparkContextVarargsWorkaround: incompatible varargs
JavaAPISuite: should test Java APIs in pure Java (for sanity)
Author: Punya Biswal <pb...@palantir.com>
Closes #605 from punya/move-java-sources and squashes the following commits:
25b00b2 [Punya Biswal] Remove redundant type param; reformat
853da46 [Punya Biswal] Use factory method rather than constructor
e5d53d9 [Punya Biswal] Migrate Java code to Scala or move it to src/main/java
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/29ac7ea5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/29ac7ea5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/29ac7ea5
Branch: refs/heads/master
Commit: 29ac7ea52fbb0c6531e14305e2fb1ccba9678f7e
Parents: 1aa4f8a
Author: Punya Biswal <pb...@palantir.com>
Authored: Sat Feb 22 17:53:48 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sat Feb 22 17:53:48 2014 -0800
----------------------------------------------------------------------
.../java/JavaSparkContextVarargsWorkaround.java | 63 ++
.../apache/spark/api/java/StorageLevels.java | 48 +
.../main/scala/org/apache/spark/SparkFiles.java | 42 -
.../scala/org/apache/spark/SparkFiles.scala | 39 +
.../java/JavaSparkContextVarargsWorkaround.java | 63 --
.../apache/spark/api/java/StorageLevels.java | 48 -
.../java/function/DoubleFlatMapFunction.java | 30 -
.../java/function/DoubleFlatMapFunction.scala | 30 +
.../spark/api/java/function/DoubleFunction.java | 30 -
.../api/java/function/DoubleFunction.scala | 29 +
.../spark/api/java/function/Function.java | 35 -
.../spark/api/java/function/Function.scala | 31 +
.../spark/api/java/function/Function2.java | 35 -
.../spark/api/java/function/Function2.scala | 29 +
.../spark/api/java/function/Function3.java | 35 -
.../spark/api/java/function/Function3.scala | 28 +
.../api/java/function/PairFlatMapFunction.java | 43 -
.../api/java/function/PairFlatMapFunction.scala | 36 +
.../spark/api/java/function/PairFunction.java | 41 -
.../spark/api/java/function/PairFunction.scala | 33 +
.../java/org/apache/spark/JavaAPISuite.java | 981 +++++++++++++++++++
.../scala/org/apache/spark/JavaAPISuite.java | 981 -------------------
22 files changed, 1347 insertions(+), 1383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
new file mode 100644
index 0000000..2090efd
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// See
+// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
+abstract class JavaSparkContextVarargsWorkaround {
+ public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
+ if (rdds.length == 0) {
+ throw new IllegalArgumentException("Union called on empty list");
+ }
+ ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
+ for (int i = 1; i < rdds.length; i++) {
+ rest.add(rdds[i]);
+ }
+ return union(rdds[0], rest);
+ }
+
+ public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
+ if (rdds.length == 0) {
+ throw new IllegalArgumentException("Union called on empty list");
+ }
+ ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
+ for (int i = 1; i < rdds.length; i++) {
+ rest.add(rdds[i]);
+ }
+ return union(rdds[0], rest);
+ }
+
+ public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
+ if (rdds.length == 0) {
+ throw new IllegalArgumentException("Union called on empty list");
+ }
+ ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
+ for (int i = 1; i < rdds.length; i++) {
+ rest.add(rdds[i]);
+ }
+ return union(rdds[0], rest);
+ }
+
+ // These methods take separate "first" and "rest" elements to avoid having the same type erasure
+ abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
+ abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
+ abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
new file mode 100644
index 0000000..9f13b39
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import org.apache.spark.storage.StorageLevel;
+
+/**
+ * Expose some commonly useful storage level constants.
+ */
+public class StorageLevels {
+ public static final StorageLevel NONE = create(false, false, false, 1);
+ public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
+ public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
+ public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
+ public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
+ public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
+ public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
+ public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
+ public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
+ public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
+ public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
+
+ /**
+ * Create a new StorageLevel object.
+ * @param useDisk saved to disk, if true
+ * @param useMemory saved to memory, if true
+ * @param deserialized saved as deserialized objects, if true
+ * @param replication replication factor
+ */
+ public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
+ return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/SparkFiles.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.java b/core/src/main/scala/org/apache/spark/SparkFiles.java
deleted file mode 100644
index af9cf85..0000000
--- a/core/src/main/scala/org/apache/spark/SparkFiles.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.File;
-
-/**
- * Resolves paths to files added through `SparkContext.addFile()`.
- */
-public class SparkFiles {
-
- private SparkFiles() {}
-
- /**
- * Get the absolute path of a file added through `SparkContext.addFile()`.
- */
- public static String get(String filename) {
- return new File(getRootDirectory(), filename).getAbsolutePath();
- }
-
- /**
- * Get the root directory that contains files added through `SparkContext.addFile()`.
- */
- public static String getRootDirectory() {
- return SparkEnv.get().sparkFilesDir();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/SparkFiles.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.scala b/core/src/main/scala/org/apache/spark/SparkFiles.scala
new file mode 100644
index 0000000..e85b89f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkFiles.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+/**
+ * Resolves paths to files added through `SparkContext.addFile()`.
+ */
+object SparkFiles {
+
+ /**
+ * Get the absolute path of a file added through `SparkContext.addFile()`.
+ */
+ def get(filename: String): String =
+ new File(getRootDirectory(), filename).getAbsolutePath()
+
+ /**
+ * Get the root directory that contains files added through `SparkContext.addFile()`.
+ */
+ def getRootDirectory(): String =
+ SparkEnv.get.sparkFilesDir
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
deleted file mode 100644
index 2090efd..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// See
-// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
-abstract class JavaSparkContextVarargsWorkaround {
- public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
- if (rdds.length == 0) {
- throw new IllegalArgumentException("Union called on empty list");
- }
- ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
- for (int i = 1; i < rdds.length; i++) {
- rest.add(rdds[i]);
- }
- return union(rdds[0], rest);
- }
-
- public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
- if (rdds.length == 0) {
- throw new IllegalArgumentException("Union called on empty list");
- }
- ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
- for (int i = 1; i < rdds.length; i++) {
- rest.add(rdds[i]);
- }
- return union(rdds[0], rest);
- }
-
- public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
- if (rdds.length == 0) {
- throw new IllegalArgumentException("Union called on empty list");
- }
- ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
- for (int i = 1; i < rdds.length; i++) {
- rest.add(rdds[i]);
- }
- return union(rdds[0], rest);
- }
-
- // These methods take separate "first" and "rest" elements to avoid having the same type erasure
- abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
- abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
- abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java b/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
deleted file mode 100644
index 0744269..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java;
-
-import org.apache.spark.storage.StorageLevel;
-
-/**
- * Expose some commonly useful storage level constants.
- */
-public class StorageLevels {
- public static final StorageLevel NONE = new StorageLevel(false, false, false, 1);
- public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1);
- public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2);
- public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1);
- public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2);
- public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1);
- public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2);
- public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1);
- public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2);
- public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1);
- public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2);
-
- /**
- * Create a new StorageLevel object.
- * @param useDisk saved to disk, if true
- * @param useMemory saved to memory, if true
- * @param deserialized saved as deserialized objects, if true
- * @param replication replication factor
- */
- public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
- return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
deleted file mode 100644
index 30e6a52..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-/**
- * A function that returns zero or more records of type Double from each input record.
- */
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-public abstract class DoubleFlatMapFunction<T> extends WrappedFunction1<T, Iterable<Double>>
- implements Serializable {
- // Intentionally left blank
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
new file mode 100644
index 0000000..7500a89
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import java.lang.{Double => JDouble, Iterable => JIterable}
+
+/**
+ * A function that returns zero or more records of type Double from each input record.
+ */
+// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
+// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
+abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
+ with Serializable {
+ // Intentionally left blank
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
deleted file mode 100644
index 490da25..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
- implements Serializable {
- // Intentionally left blank
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
new file mode 100644
index 0000000..2cdf2e9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import java.lang.{Double => JDouble}
+
+/**
+ * A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
+// DoubleFunction does not extend Function because some UDF functions, like map,
+// are overloaded for both Function and DoubleFunction.
+abstract class DoubleFunction[T] extends WrappedFunction1[T, JDouble] with Serializable {
+ // Intentionally left blank
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
deleted file mode 100644
index e0fcd46..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
- * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
- * when mapping RDDs of other types.
- */
-public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
- public ClassTag<R> returnType() {
- return ClassTag$.MODULE$.apply(Object.class);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
new file mode 100644
index 0000000..a5e1701
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.reflect.ClassTag
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * Base class for functions whose return types do not create special RDDs. PairFunction and
+ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
+ * when mapping RDDs of other types.
+ */
+abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
+ def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
deleted file mode 100644
index 16d7379..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A two-argument function that takes arguments of type T1 and T2 and returns an R.
- */
-public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
- implements Serializable {
-
- public ClassTag<R> returnType() {
- return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
new file mode 100644
index 0000000..fa3616c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.reflect.ClassTag
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ */
+abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
+ def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
deleted file mode 100644
index 096eb71..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
- */
-public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
- implements Serializable {
-
- public ClassTag<R> returnType() {
- return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
new file mode 100644
index 0000000..4515289
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import org.apache.spark.api.java.JavaSparkContext
+import scala.reflect.ClassTag
+
+/**
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+ */
+abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
+ def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
deleted file mode 100644
index c72b98c..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-public abstract class PairFlatMapFunction<T, K, V>
- extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
- implements Serializable {
-
- public ClassTag<K> keyType() {
- return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
- }
-
- public ClassTag<V> valueType() {
- return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
new file mode 100644
index 0000000..8467bbb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import java.lang.{Iterable => JIterable}
+import org.apache.spark.api.java.JavaSparkContext
+import scala.reflect.ClassTag
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
+// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
+// overloaded for both FlatMapFunction and PairFlatMapFunction.
+abstract class PairFlatMapFunction[T, K, V] extends WrappedFunction1[T, JIterable[(K, V)]]
+ with Serializable {
+
+ def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
+
+ def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
deleted file mode 100644
index 84b9136..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-public abstract class PairFunction<T, K, V> extends WrappedFunction1<T, Tuple2<K, V>>
- implements Serializable {
-
- public ClassTag<K> keyType() {
- return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
- }
-
- public ClassTag<V> valueType() {
- return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
new file mode 100644
index 0000000..d0ba0b6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.reflect.ClassTag
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
+// PairFunction does not extend Function because some UDF functions, like map,
+// are overloaded for both Function and PairFunction.
+abstract class PairFunction[T, K, V] extends WrappedFunction1[T, (K, V)] with Serializable {
+
+ def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
+
+ def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
new file mode 100644
index 0000000..20232e9
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.StatCounter;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaAPISuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port");
+ }
+
+ static class ReverseIntComparator implements Comparator<Integer>, Serializable {
+
+ @Override
+ public int compare(Integer a, Integer b) {
+ if (a > b) return -1;
+ else if (a < b) return 1;
+ else return 0;
+ }
+ };
+
+ @Test
+ public void sparkContextUnion() {
+ // Union of non-specialized JavaRDDs
+ List<String> strings = Arrays.asList("Hello", "World");
+ JavaRDD<String> s1 = sc.parallelize(strings);
+ JavaRDD<String> s2 = sc.parallelize(strings);
+ // Varargs
+ JavaRDD<String> sUnion = sc.union(s1, s2);
+ Assert.assertEquals(4, sUnion.count());
+ // List
+ List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>();
+ list.add(s2);
+ sUnion = sc.union(s1, list);
+ Assert.assertEquals(4, sUnion.count());
+
+ // Union of JavaDoubleRDDs
+ List<Double> doubles = Arrays.asList(1.0, 2.0);
+ JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
+ JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
+ JavaDoubleRDD dUnion = sc.union(d1, d2);
+ Assert.assertEquals(4, dUnion.count());
+
+ // Union of JavaPairRDDs
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
+ pairs.add(new Tuple2<Integer, Integer>(1, 2));
+ pairs.add(new Tuple2<Integer, Integer>(3, 4));
+ JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
+ Assert.assertEquals(4, pUnion.count());
+ }
+
+ @Test
+ public void sortByKey() {
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
+ pairs.add(new Tuple2<Integer, Integer>(0, 4));
+ pairs.add(new Tuple2<Integer, Integer>(3, 2));
+ pairs.add(new Tuple2<Integer, Integer>(-1, 1));
+
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+
+ // Default comparator
+ JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
+ Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+ List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
+ Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
+ Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
+
+ // Custom comparator
+ sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false);
+ Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+ sortedPairs = sortedRDD.collect();
+ Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
+ Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
+ }
+
+ static int foreachCalls = 0;
+
+ @Test
+ public void foreach() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String s) {
+ foreachCalls++;
+ }
+ });
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void lookup() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ Assert.assertEquals(2, categories.lookup("Oranges").size());
+ Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
+ }
+
+ @Test
+ public void groupBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
+ @Override
+ public Boolean call(Integer x) {
+ return x % 2 == 0;
+ }
+ };
+ JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
+ Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+
+ oddsAndEvens = rdd.groupBy(isOdd, 1);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
+ Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ }
+
+ @Test
+ public void cogroup() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 2),
+ new Tuple2<String, Integer>("Apples", 3)
+ ));
+ JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
+ Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
+ Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
+
+ cogrouped.collect();
+ }
+
+ @Test
+ public void leftOuterJoin() {
+ JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(1, 2),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(3, 1)
+ ));
+ JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Character>(1, 'x'),
+ new Tuple2<Integer, Character>(2, 'y'),
+ new Tuple2<Integer, Character>(2, 'z'),
+ new Tuple2<Integer, Character>(4, 'w')
+ ));
+ List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(
+ new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
+ throws Exception {
+ return !tup._2()._2().isPresent();
+ }
+ }).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
+ @Test
+ public void foldReduce() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer a, Integer b) {
+ return a + b;
+ }
+ };
+
+ int sum = rdd.fold(0, add);
+ Assert.assertEquals(33, sum);
+
+ sum = rdd.reduce(add);
+ Assert.assertEquals(33, sum);
+ }
+
+ @Test
+ public void foldByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(3, 2),
+ new Tuple2<Integer, Integer>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer a, Integer b) {
+ return a + b;
+ }
+ });
+ Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+ }
+
+ @Test
+ public void reduceByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(3, 2),
+ new Tuple2<Integer, Integer>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer a, Integer b) {
+ return a + b;
+ }
+ });
+ Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+ Map<Integer, Integer> localCounts = counts.collectAsMap();
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+
+ localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
+ Integer>() {
+ @Override
+ public Integer call(Integer a, Integer b) {
+ return a + b;
+ }
+ });
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+ }
+
+ @Test
+ public void approximateResults() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Map<Integer, Long> countsByValue = rdd.countByValue();
+ Assert.assertEquals(2, countsByValue.get(1).longValue());
+ Assert.assertEquals(1, countsByValue.get(13).longValue());
+
+ PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
+ Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
+ Assert.assertEquals(2.0, finalValue.get(1).mean(), 0.01);
+ Assert.assertEquals(1.0, finalValue.get(13).mean(), 0.01);
+ }
+
+ @Test
+ public void take() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Assert.assertEquals(1, rdd.first().intValue());
+ List<Integer> firstTwo = rdd.take(2);
+ List<Integer> sample = rdd.takeSample(false, 2, 42);
+ }
+
+ @Test
+ public void cartesian() {
+ JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+ JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
+ JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
+ Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first());
+ }
+
+ @Test
+ public void javaDoubleRDD() {
+ JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+ JavaDoubleRDD distinct = rdd.distinct();
+ Assert.assertEquals(5, distinct.count());
+ JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
+ @Override
+ public Boolean call(Double x) {
+ return x > 2.0;
+ }
+ });
+ Assert.assertEquals(3, filter.count());
+ JavaDoubleRDD union = rdd.union(rdd);
+ Assert.assertEquals(12, union.count());
+ union = union.cache();
+ Assert.assertEquals(12, union.count());
+
+ Assert.assertEquals(20, rdd.sum(), 0.01);
+ StatCounter stats = rdd.stats();
+ Assert.assertEquals(20, stats.sum(), 0.01);
+ Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
+ Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
+ Assert.assertEquals(6.22222, rdd.variance(), 0.01);
+ Assert.assertEquals(7.46667, rdd.sampleVariance(), 0.01);
+ Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
+ Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
+
+ Double first = rdd.first();
+ List<Double> take = rdd.take(5);
+ }
+
+ @Test
+ public void javaDoubleRDDHistoGram() {
+ JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+ // Test using generated buckets
+ Tuple2<double[], long[]> results = rdd.histogram(2);
+ double[] expected_buckets = {1.0, 2.5, 4.0};
+ long[] expected_counts = {2, 2};
+ Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
+ Assert.assertArrayEquals(expected_counts, results._2);
+ // Test with provided buckets
+ long[] histogram = rdd.histogram(expected_buckets);
+ Assert.assertArrayEquals(expected_counts, histogram);
+ }
+
+ @Test
+ public void map() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ @Override
+ public Double call(Integer x) {
+ return 1.0 * x;
+ }
+ }).cache();
+ doubles.collect();
+ JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer x) {
+ return new Tuple2<Integer, Integer>(x, x);
+ }
+ }).cache();
+ pairs.collect();
+ JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
+ @Override
+ public String call(Integer x) {
+ return x.toString();
+ }
+ }).cache();
+ strings.collect();
+ }
+
+ @Test
+ public void flatMap() {
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+ "The quick brown fox jumps over the lazy dog."));
+ JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Arrays.asList(x.split(" "));
+ }
+ });
+ Assert.assertEquals("Hello", words.first());
+ Assert.assertEquals(11, words.count());
+
+ JavaPairRDD<String, String> pairs = rdd.flatMap(
+ new PairFlatMapFunction<String, String, String>() {
+
+ @Override
+ public Iterable<Tuple2<String, String>> call(String s) {
+ List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
+ for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
+ return pairs;
+ }
+ }
+ );
+ Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+ Assert.assertEquals(11, pairs.count());
+
+ JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
+ @Override
+ public Iterable<Double> call(String s) {
+ List<Double> lengths = new LinkedList<Double>();
+ for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+ return lengths;
+ }
+ });
+ Double x = doubles.first();
+ Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+ Assert.assertEquals(11, pairs.count());
+ }
+
+ @Test
+ public void mapsFromPairsToPairs() {
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+ // Regression test for SPARK-668:
+ JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+ new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
+ @Override
+ public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
+ return Collections.singletonList(item.swap());
+ }
+ });
+ swapped.collect();
+
+ // There was never a bug here, but it's worth testing:
+ pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
+ return item.swap();
+ }
+ }).collect();
+ }
+
+ @Test
+ public void mapPartitions() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+ JavaRDD<Integer> partitionSums = rdd.mapPartitions(
+ new FlatMapFunction<Iterator<Integer>, Integer>() {
+ @Override
+ public Iterable<Integer> call(Iterator<Integer> iter) {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
+ }
+ return Collections.singletonList(sum);
+ }
+ });
+ Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+ }
+
+ @Test
+ public void repartition() {
+ // Shrinking number of partitions
+ JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
+ JavaRDD<Integer> repartitioned1 = in1.repartition(4);
+ List<List<Integer>> result1 = repartitioned1.glom().collect();
+ Assert.assertEquals(4, result1.size());
+ for (List<Integer> l: result1) {
+ Assert.assertTrue(l.size() > 0);
+ }
+
+ // Growing number of partitions
+ JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
+ JavaRDD<Integer> repartitioned2 = in2.repartition(2);
+ List<List<Integer>> result2 = repartitioned2.glom().collect();
+ Assert.assertEquals(2, result2.size());
+ for (List<Integer> l: result2) {
+ Assert.assertTrue(l.size() > 0);
+ }
+ }
+
+ @Test
+ public void persist() {
+ JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+ doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
+ Assert.assertEquals(20, doubleRDD.sum(), 0.1);
+
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+ pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
+ Assert.assertEquals("a", pairRDD.first()._2());
+
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ rdd = rdd.persist(StorageLevel.DISK_ONLY());
+ Assert.assertEquals(1, rdd.first().intValue());
+ }
+
+ @Test
+ public void iterator() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
+ TaskContext context = new TaskContext(0, 0, 0, false, false, null);
+ Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
+ }
+
+ @Test
+ public void glom() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+ Assert.assertEquals("[1, 2]", rdd.glom().first().toString());
+ }
+
+ // File input / output tests are largely adapted from FileSuite:
+
+ @Test
+ public void textFiles() throws IOException {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ rdd.saveAsTextFile(outputDir);
+ // Read the plain text file and check it's OK
+ File outputFile = new File(outputDir, "part-00000");
+ String content = Files.toString(outputFile, Charsets.UTF_8);
+ Assert.assertEquals("1\n2\n3\n4\n", content);
+ // Also try reading it in as a text file RDD
+ List<String> expected = Arrays.asList("1", "2", "3", "4");
+ JavaRDD<String> readRDD = sc.textFile(outputDir);
+ Assert.assertEquals(expected, readRDD.collect());
+ }
+
+ @Test
+ public void textFilesCompressed() throws IOException {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ rdd.saveAsTextFile(outputDir, DefaultCodec.class);
+
+ // Try reading it in as a text file RDD
+ List<String> expected = Arrays.asList("1", "2", "3", "4");
+ JavaRDD<String> readRDD = sc.textFile(outputDir);
+ Assert.assertEquals(expected, readRDD.collect());
+ }
+
+ @Test
+ public void sequenceFile() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ @Override
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+ return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ }
+ }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+ // Try reading the output back as an object file
+ JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
+ Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
+ @Override
+ public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
+ return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
+ }
+ });
+ Assert.assertEquals(pairs, readRDD.collect());
+ }
+
+ @Test
+ public void writeWithNewAPIHadoopFile() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ @Override
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+ return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ }
+ }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
+ org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+
+ JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class,
+ Text.class);
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+ String>() {
+ @Override
+ public String call(Tuple2<IntWritable, Text> x) {
+ return x.toString();
+ }
+ }).collect().toString());
+ }
+
+ @Test
+ public void readWithNewAPIHadoopFile() throws IOException {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ @Override
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+ return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ }
+ }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+ JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class,
+ Text.class, new Job().getConfiguration());
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+ String>() {
+ @Override
+ public String call(Tuple2<IntWritable, Text> x) {
+ return x.toString();
+ }
+ }).collect().toString());
+ }
+
+ @Test
+ public void objectFilesOfInts() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ rdd.saveAsObjectFile(outputDir);
+ // Try reading the output back as an object file
+ List<Integer> expected = Arrays.asList(1, 2, 3, 4);
+ JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
+ Assert.assertEquals(expected, readRDD.collect());
+ }
+
+ @Test
+ public void objectFilesOfComplexTypes() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+ rdd.saveAsObjectFile(outputDir);
+ // Try reading the output back as an object file
+ JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
+ Assert.assertEquals(pairs, readRDD.collect());
+ }
+
+ @Test
+ public void hadoopFile() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ @Override
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+ return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ }
+ }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+ JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+ String>() {
+ @Override
+ public String call(Tuple2<IntWritable, Text> x) {
+ return x.toString();
+ }
+ }).collect().toString());
+ }
+
+ @Test
+ public void hadoopFileCompressed() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ @Override
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+ return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ }
+ }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
+ DefaultCodec.class);
+
+ JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+ String>() {
+ @Override
+ public String call(Tuple2<IntWritable, Text> x) {
+ return x.toString();
+ }
+ }).collect().toString());
+ }
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ @Override
+ public Double call(Integer x) {
+ return 1.0 * x;
+ }
+ });
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
+
+ @Test
+ public void zipPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+ JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+ FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+ new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
+ @Override
+ public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
+ int sizeI = 0;
+ int sizeS = 0;
+ while (i.hasNext()) {
+ sizeI += 1;
+ i.next();
+ }
+ while (s.hasNext()) {
+ sizeS += 1;
+ s.next();
+ }
+ return Arrays.asList(sizeI, sizeS);
+ }
+ };
+
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+ Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+ }
+
+ @Test
+ public void accumulators() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+ final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ intAccum.add(x);
+ }
+ });
+ Assert.assertEquals((Integer) 25, intAccum.value());
+
+ final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ doubleAccum.add((double) x);
+ }
+ });
+ Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+ // Try a custom accumulator type
+ AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+ public Float addInPlace(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float addAccumulator(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float zero(Float initialValue) {
+ return 0.0f;
+ }
+ };
+
+ final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ floatAccum.add((float) x);
+ }
+ });
+ Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+ // Test the setValue method
+ floatAccum.setValue(5.0f);
+ Assert.assertEquals((Float) 5.0f, floatAccum.value());
+ }
+
+ @Test
+ public void keyBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+ List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
+ public String call(Integer t) throws Exception {
+ return t.toString();
+ }
+ }).collect();
+ Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+ }
+
+ @Test
+ public void checkpointAndComputation() {
+ File tempDir = Files.createTempDir();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ sc.setCheckpointDir(tempDir.getAbsolutePath());
+ Assert.assertEquals(false, rdd.isCheckpointed());
+ rdd.checkpoint();
+ rdd.count(); // Forces the DAG to cause a checkpoint
+ Assert.assertEquals(true, rdd.isCheckpointed());
+ Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
+ }
+
+ @Test
+ public void checkpointAndRestore() {
+ File tempDir = Files.createTempDir();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ sc.setCheckpointDir(tempDir.getAbsolutePath());
+ Assert.assertEquals(false, rdd.isCheckpointed());
+ rdd.checkpoint();
+ rdd.count(); // Forces the DAG to cause a checkpoint
+ Assert.assertEquals(true, rdd.isCheckpointed());
+
+ Assert.assertTrue(rdd.getCheckpointFile().isPresent());
+ JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
+ Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
+ }
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+ JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+ new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+ return new Tuple2<Integer, Integer>(in._2(), in._1());
+ }
+ });
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(0, 2),
+ new Tuple2<Integer, Integer>(1, 3),
+ new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+
+ }
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+
+ List[] parts = rdd1.collectPartitions(new int[] {0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartitions(new int[] {0})[0]);
+
+ parts = rdd2.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)),
+ parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)),
+ parts[1]);
+ }
+
+ @Test
+ public void countApproxDistinct() {
+ List<Integer> arrayData = new ArrayList<Integer>();
+ int size = 100;
+ for (int i = 0; i < 100000; i++) {
+ arrayData.add(i % size);
+ }
+ JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
+ }
+
+ @Test
+ public void countApproxDistinctByKey() {
+ double relativeSD = 0.001;
+
+ List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
+ for (int i = 10; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ arrayData.add(new Tuple2<Integer, Integer>(i, j));
+
+ JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
+ List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
+ for (Tuple2<Integer, Object> resItem : res) {
+ double count = (double)resItem._1();
+ Long resCount = (Long)resItem._2();
+ Double error = Math.abs((resCount - count) / count);
+ Assert.assertTrue(error < relativeSD);
+ }
+
+ }
+
+ @Test
+ public void collectAsMapWithIntArrayValues() {
+ // Regression test for SPARK-1040
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+ JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+ @Override
+ public Tuple2<Integer, int[]> call(Integer x) throws Exception {
+ return new Tuple2<Integer, int[]>(x, new int[] { x });
+ }
+ });
+ pairRDD.collect(); // Works fine
+ Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ }
+}