You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/02/23 02:54:06 UTC

[1/2] Migrate Java code to Scala or move it to src/main/java

Repository: incubator-spark
Updated Branches:
  refs/heads/master 1aa4f8af7 -> 29ac7ea52


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
deleted file mode 100644
index 20232e9..0000000
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ /dev/null
@@ -1,981 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.partial.BoundedDouble;
-import org.apache.spark.partial.PartialResult;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.util.StatCounter;
-
-// The test suite itself is Serializable so that anonymous Function implementations can be
-// serialized, as an alternative to converting these anonymous classes to static inner classes;
-// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
-  private transient JavaSparkContext sc;
-
-  @Before
-  public void setUp() {
-    sc = new JavaSparkContext("local", "JavaAPISuite");
-  }
-
-  @After
-  public void tearDown() {
-    sc.stop();
-    sc = null;
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port");
-  }
-
-  static class ReverseIntComparator implements Comparator<Integer>, Serializable {
-
-    @Override
-    public int compare(Integer a, Integer b) {
-      if (a > b) return -1;
-      else if (a < b) return 1;
-      else return 0;
-    }
-  };
-
-  @Test
-  public void sparkContextUnion() {
-    // Union of non-specialized JavaRDDs
-    List<String> strings = Arrays.asList("Hello", "World");
-    JavaRDD<String> s1 = sc.parallelize(strings);
-    JavaRDD<String> s2 = sc.parallelize(strings);
-    // Varargs
-    JavaRDD<String> sUnion = sc.union(s1, s2);
-    Assert.assertEquals(4, sUnion.count());
-    // List
-    List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>();
-    list.add(s2);
-    sUnion = sc.union(s1, list);
-    Assert.assertEquals(4, sUnion.count());
-
-    // Union of JavaDoubleRDDs
-    List<Double> doubles = Arrays.asList(1.0, 2.0);
-    JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
-    JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
-    JavaDoubleRDD dUnion = sc.union(d1, d2);
-    Assert.assertEquals(4, dUnion.count());
-
-    // Union of JavaPairRDDs
-    List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
-    pairs.add(new Tuple2<Integer, Integer>(1, 2));
-    pairs.add(new Tuple2<Integer, Integer>(3, 4));
-    JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
-    Assert.assertEquals(4, pUnion.count());
-  }
-
-  @Test
-  public void sortByKey() {
-    List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
-    pairs.add(new Tuple2<Integer, Integer>(0, 4));
-    pairs.add(new Tuple2<Integer, Integer>(3, 2));
-    pairs.add(new Tuple2<Integer, Integer>(-1, 1));
-
-    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-
-    // Default comparator
-    JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
-    Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
-    List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
-    Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
-    Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
-
-    // Custom comparator
-    sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false);
-    Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
-    sortedPairs = sortedRDD.collect();
-    Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
-    Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
-  }
-
-  static int foreachCalls = 0;
-
-  @Test
-  public void foreach() {
-    foreachCalls = 0;
-    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
-    rdd.foreach(new VoidFunction<String>() {
-      @Override
-      public void call(String s) {
-        foreachCalls++;
-      }
-    });
-    Assert.assertEquals(2, foreachCalls);
-  }
-
-  @Test
-  public void lookup() {
-    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<String, String>("Apples", "Fruit"),
-      new Tuple2<String, String>("Oranges", "Fruit"),
-      new Tuple2<String, String>("Oranges", "Citrus")
-      ));
-    Assert.assertEquals(2, categories.lookup("Oranges").size());
-    Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
-  }
-
-  @Test
-  public void groupBy() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
-      @Override
-      public Boolean call(Integer x) {
-        return x % 2 == 0;
-      }
-    };
-    JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
-    Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
-    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
-
-    oddsAndEvens = rdd.groupBy(isOdd, 1);
-    Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
-    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
-  }
-
-  @Test
-  public void cogroup() {
-    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<String, String>("Apples", "Fruit"),
-      new Tuple2<String, String>("Oranges", "Fruit"),
-      new Tuple2<String, String>("Oranges", "Citrus")
-      ));
-    JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<String, Integer>("Oranges", 2),
-      new Tuple2<String, Integer>("Apples", 3)
-    ));
-    JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
-    Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
-    Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
-
-    cogrouped.collect();
-  }
-
-  @Test
-  public void leftOuterJoin() {
-    JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(1, 2),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(3, 1)
-      ));
-    JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<Integer, Character>(1, 'x'),
-      new Tuple2<Integer, Character>(2, 'y'),
-      new Tuple2<Integer, Character>(2, 'z'),
-      new Tuple2<Integer, Character>(4, 'w')
-    ));
-    List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
-      rdd1.leftOuterJoin(rdd2).collect();
-    Assert.assertEquals(5, joined.size());
-    Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
-      rdd1.leftOuterJoin(rdd2).filter(
-        new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
-          @Override
-          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
-            throws Exception {
-            return !tup._2()._2().isPresent();
-          }
-      }).first();
-    Assert.assertEquals(3, firstUnmatched._1().intValue());
-  }
-
-  @Test
-  public void foldReduce() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    };
-
-    int sum = rdd.fold(0, add);
-    Assert.assertEquals(33, sum);
-
-    sum = rdd.reduce(add);
-    Assert.assertEquals(33, sum);
-  }
-
-  @Test
-  public void foldByKey() {
-    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(3, 2),
-      new Tuple2<Integer, Integer>(3, 1)
-    );
-    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer a, Integer b) {
-          return a + b;
-        }
-    });
-    Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
-    Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
-    Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
-  }
-
-  @Test
-  public void reduceByKey() {
-    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(3, 2),
-      new Tuple2<Integer, Integer>(3, 1)
-    );
-    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer a, Integer b) {
-         return a + b;
-        }
-    });
-    Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
-    Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
-    Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
-
-    Map<Integer, Integer> localCounts = counts.collectAsMap();
-    Assert.assertEquals(1, localCounts.get(1).intValue());
-    Assert.assertEquals(2, localCounts.get(2).intValue());
-    Assert.assertEquals(3, localCounts.get(3).intValue());
-
-   localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
-      Integer>() {
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    });
-    Assert.assertEquals(1, localCounts.get(1).intValue());
-    Assert.assertEquals(2, localCounts.get(2).intValue());
-    Assert.assertEquals(3, localCounts.get(3).intValue());
-  }
-
-  @Test
-  public void approximateResults() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Map<Integer, Long> countsByValue = rdd.countByValue();
-    Assert.assertEquals(2, countsByValue.get(1).longValue());
-    Assert.assertEquals(1, countsByValue.get(13).longValue());
-
-    PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
-    Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
-    Assert.assertEquals(2.0, finalValue.get(1).mean(), 0.01);
-    Assert.assertEquals(1.0, finalValue.get(13).mean(), 0.01);
-  }
-
-  @Test
-  public void take() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Assert.assertEquals(1, rdd.first().intValue());
-    List<Integer> firstTwo = rdd.take(2);
-    List<Integer> sample = rdd.takeSample(false, 2, 42);
-  }
-
-  @Test
-  public void cartesian() {
-    JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
-    JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
-    JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
-    Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first());
-  }
-
-  @Test
-  public void javaDoubleRDD() {
-    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
-    JavaDoubleRDD distinct = rdd.distinct();
-    Assert.assertEquals(5, distinct.count());
-    JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
-      @Override
-      public Boolean call(Double x) {
-        return x > 2.0;
-      }
-    });
-    Assert.assertEquals(3, filter.count());
-    JavaDoubleRDD union = rdd.union(rdd);
-    Assert.assertEquals(12, union.count());
-    union = union.cache();
-    Assert.assertEquals(12, union.count());
-
-    Assert.assertEquals(20, rdd.sum(), 0.01);
-    StatCounter stats = rdd.stats();
-    Assert.assertEquals(20, stats.sum(), 0.01);
-    Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
-    Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
-    Assert.assertEquals(6.22222, rdd.variance(), 0.01);
-    Assert.assertEquals(7.46667, rdd.sampleVariance(), 0.01);
-    Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
-    Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
-
-    Double first = rdd.first();
-    List<Double> take = rdd.take(5);
-  }
-
-  @Test
-  public void javaDoubleRDDHistoGram() {
-   JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
-   // Test using generated buckets
-   Tuple2<double[], long[]> results = rdd.histogram(2);
-   double[] expected_buckets = {1.0, 2.5, 4.0};
-   long[] expected_counts = {2, 2};
-   Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
-   Assert.assertArrayEquals(expected_counts, results._2);
-   // Test with provided buckets
-   long[] histogram = rdd.histogram(expected_buckets);
-   Assert.assertArrayEquals(expected_counts, histogram);
-  }
-
-  @Test
-  public void map() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
-      @Override
-      public Double call(Integer x) {
-        return 1.0 * x;
-      }
-    }).cache();
-    doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer x) {
-        return new Tuple2<Integer, Integer>(x, x);
-      }
-    }).cache();
-    pairs.collect();
-    JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
-      @Override
-      public String call(Integer x) {
-        return x.toString();
-      }
-    }).cache();
-    strings.collect();
-  }
-
-  @Test
-  public void flatMap() {
-    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
-      "The quick brown fox jumps over the lazy dog."));
-    JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterable<String> call(String x) {
-        return Arrays.asList(x.split(" "));
-      }
-    });
-    Assert.assertEquals("Hello", words.first());
-    Assert.assertEquals(11, words.count());
-
-    JavaPairRDD<String, String> pairs = rdd.flatMap(
-      new PairFlatMapFunction<String, String, String>() {
-
-        @Override
-        public Iterable<Tuple2<String, String>> call(String s) {
-          List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
-          for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
-          return pairs;
-        }
-      }
-    );
-    Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
-    Assert.assertEquals(11, pairs.count());
-
-    JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
-      @Override
-      public Iterable<Double> call(String s) {
-        List<Double> lengths = new LinkedList<Double>();
-        for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
-        return lengths;
-      }
-    });
-    Double x = doubles.first();
-    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
-    Assert.assertEquals(11, pairs.count());
-  }
-
-  @Test
-  public void mapsFromPairsToPairs() {
-      List<Tuple2<Integer, String>> pairs = Arrays.asList(
-              new Tuple2<Integer, String>(1, "a"),
-              new Tuple2<Integer, String>(2, "aa"),
-              new Tuple2<Integer, String>(3, "aaa")
-      );
-      JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-
-      // Regression test for SPARK-668:
-      JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
-          new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
-          @Override
-          public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
-              return Collections.singletonList(item.swap());
-          }
-      });
-      swapped.collect();
-
-      // There was never a bug here, but it's worth testing:
-      pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
-          @Override
-          public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
-              return item.swap();
-          }
-      }).collect();
-  }
-
-  @Test
-  public void mapPartitions() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
-    JavaRDD<Integer> partitionSums = rdd.mapPartitions(
-      new FlatMapFunction<Iterator<Integer>, Integer>() {
-        @Override
-        public Iterable<Integer> call(Iterator<Integer> iter) {
-          int sum = 0;
-          while (iter.hasNext()) {
-            sum += iter.next();
-          }
-          return Collections.singletonList(sum);
-        }
-    });
-    Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
-  }
-
-  @Test
-  public void repartition() {
-    // Shrinking number of partitions
-    JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
-    JavaRDD<Integer> repartitioned1 = in1.repartition(4);
-    List<List<Integer>> result1 = repartitioned1.glom().collect();
-    Assert.assertEquals(4, result1.size());
-    for (List<Integer> l: result1) {
-      Assert.assertTrue(l.size() > 0);
-    }
-
-    // Growing number of partitions
-    JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
-    JavaRDD<Integer> repartitioned2 = in2.repartition(2);
-    List<List<Integer>> result2 = repartitioned2.glom().collect();
-    Assert.assertEquals(2, result2.size());
-    for (List<Integer> l: result2) {
-      Assert.assertTrue(l.size() > 0);
-    }
-  }
-
-  @Test
-  public void persist() {
-    JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
-    doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
-    Assert.assertEquals(20, doubleRDD.sum(), 0.1);
-
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-    pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
-    Assert.assertEquals("a", pairRDD.first()._2());
-
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    rdd = rdd.persist(StorageLevel.DISK_ONLY());
-    Assert.assertEquals(1, rdd.first().intValue());
-  }
-
-  @Test
-  public void iterator() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
-    TaskContext context = new TaskContext(0, 0, 0, false, false, null);
-    Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
-  }
-
-  @Test
-  public void glom() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
-    Assert.assertEquals("[1, 2]", rdd.glom().first().toString());
-  }
-
-  // File input / output tests are largely adapted from FileSuite:
-
-  @Test
-  public void textFiles() throws IOException {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
-    rdd.saveAsTextFile(outputDir);
-    // Read the plain text file and check it's OK
-    File outputFile = new File(outputDir, "part-00000");
-    String content = Files.toString(outputFile, Charsets.UTF_8);
-    Assert.assertEquals("1\n2\n3\n4\n", content);
-    // Also try reading it in as a text file RDD
-    List<String> expected = Arrays.asList("1", "2", "3", "4");
-    JavaRDD<String> readRDD = sc.textFile(outputDir);
-    Assert.assertEquals(expected, readRDD.collect());
-  }
-
-  @Test
-  public void textFilesCompressed() throws IOException {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
-    rdd.saveAsTextFile(outputDir, DefaultCodec.class);
-
-    // Try reading it in as a text file RDD
-    List<String> expected = Arrays.asList("1", "2", "3", "4");
-    JavaRDD<String> readRDD = sc.textFile(outputDir);
-    Assert.assertEquals(expected, readRDD.collect());
-  }
-
-  @Test
-  public void sequenceFile() {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
-    // Try reading the output back as an object file
-    JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
-      Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
-      @Override
-      public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
-        return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
-      }
-    });
-    Assert.assertEquals(pairs, readRDD.collect());
-  }
-
-  @Test
-  public void writeWithNewAPIHadoopFile() {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
-      org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
-
-    JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class,
-      Text.class);
-    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
-      String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
-  }
-
-  @Test
-  public void readWithNewAPIHadoopFile() throws IOException {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
-    JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
-      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class,
-      Text.class, new Job().getConfiguration());
-    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
-      String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
-  }
-
-  @Test
-  public void objectFilesOfInts() {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
-    rdd.saveAsObjectFile(outputDir);
-    // Try reading the output back as an object file
-    List<Integer> expected = Arrays.asList(1, 2, 3, 4);
-    JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
-    Assert.assertEquals(expected, readRDD.collect());
-  }
-
-  @Test
-  public void objectFilesOfComplexTypes() {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-    rdd.saveAsObjectFile(outputDir);
-    // Try reading the output back as an object file
-    JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
-    Assert.assertEquals(pairs, readRDD.collect());
-  }
-
-  @Test
-  public void hadoopFile() {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
-    JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
-      SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
-      String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
-  }
-
-  @Test
-  public void hadoopFileCompressed() {
-    File tempDir = Files.createTempDir();
-    String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-        new Tuple2<Integer, String>(1, "a"),
-        new Tuple2<Integer, String>(2, "aa"),
-        new Tuple2<Integer, String>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
-        DefaultCodec.class);
-
-    JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
-        SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
-    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
-        String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
-  }
-
-  @Test
-  public void zip() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
-      @Override
-      public Double call(Integer x) {
-        return 1.0 * x;
-      }
-    });
-    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
-    zipped.count();
-  }
-
-  @Test
-  public void zipPartitions() {
-    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
-    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
-    FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
-      new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
-        @Override
-        public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
-          int sizeI = 0;
-          int sizeS = 0;
-          while (i.hasNext()) {
-            sizeI += 1;
-            i.next();
-          }
-          while (s.hasNext()) {
-            sizeS += 1;
-            s.next();
-          }
-          return Arrays.asList(sizeI, sizeS);
-        }
-      };
-
-    JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
-    Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
-  }
-
-  @Test
-  public void accumulators() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-
-    final Accumulator<Integer> intAccum = sc.intAccumulator(10);
-    rdd.foreach(new VoidFunction<Integer>() {
-      public void call(Integer x) {
-        intAccum.add(x);
-      }
-    });
-    Assert.assertEquals((Integer) 25, intAccum.value());
-
-    final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
-    rdd.foreach(new VoidFunction<Integer>() {
-      public void call(Integer x) {
-        doubleAccum.add((double) x);
-      }
-    });
-    Assert.assertEquals((Double) 25.0, doubleAccum.value());
-
-    // Try a custom accumulator type
-    AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
-      public Float addInPlace(Float r, Float t) {
-        return r + t;
-      }
-
-      public Float addAccumulator(Float r, Float t) {
-        return r + t;
-      }
-
-      public Float zero(Float initialValue) {
-        return 0.0f;
-      }
-    };
-
-    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
-    rdd.foreach(new VoidFunction<Integer>() {
-      public void call(Integer x) {
-        floatAccum.add((float) x);
-      }
-    });
-    Assert.assertEquals((Float) 25.0f, floatAccum.value());
-
-    // Test the setValue method
-    floatAccum.setValue(5.0f);
-    Assert.assertEquals((Float) 5.0f, floatAccum.value());
-  }
-
-  @Test
-  public void keyBy() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
-    List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
-      public String call(Integer t) throws Exception {
-        return t.toString();
-      }
-    }).collect();
-    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
-    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
-  }
-
-  @Test
-  public void checkpointAndComputation() {
-    File tempDir = Files.createTempDir();
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    sc.setCheckpointDir(tempDir.getAbsolutePath());
-    Assert.assertEquals(false, rdd.isCheckpointed());
-    rdd.checkpoint();
-    rdd.count(); // Forces the DAG to cause a checkpoint
-    Assert.assertEquals(true, rdd.isCheckpointed());
-    Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
-  }
-
-  @Test
-  public void checkpointAndRestore() {
-    File tempDir = Files.createTempDir();
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    sc.setCheckpointDir(tempDir.getAbsolutePath());
-    Assert.assertEquals(false, rdd.isCheckpointed());
-    rdd.checkpoint();
-    rdd.count(); // Forces the DAG to cause a checkpoint
-    Assert.assertEquals(true, rdd.isCheckpointed());
-
-    Assert.assertTrue(rdd.getCheckpointFile().isPresent());
-    JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
-    Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
-  }
-
-  @Test
-  public void mapOnPairRDD() {
-    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
-        return new Tuple2<Integer, Integer>(i, i % 2);
-      }
-    });
-    JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
-        new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
-        return new Tuple2<Integer, Integer>(in._2(), in._1());
-      }
-    });
-    Assert.assertEquals(Arrays.asList(
-        new Tuple2<Integer, Integer>(1, 1),
-        new Tuple2<Integer, Integer>(0, 2),
-        new Tuple2<Integer, Integer>(1, 3),
-        new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
-
-  }
-
-  @Test
-  public void collectPartitions() {
-    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
-
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
-        return new Tuple2<Integer, Integer>(i, i % 2);
-      }
-    });
-
-    List[] parts = rdd1.collectPartitions(new int[] {0});
-    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
-
-    parts = rdd1.collectPartitions(new int[] {1, 2});
-    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
-    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
-
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
-                                      new Tuple2<Integer, Integer>(2, 0)),
-                        rdd2.collectPartitions(new int[] {0})[0]);
-
-    parts = rdd2.collectPartitions(new int[] {1, 2});
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
-                                      new Tuple2<Integer, Integer>(4, 0)),
-                        parts[0]);
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
-                                      new Tuple2<Integer, Integer>(6, 0),
-                                      new Tuple2<Integer, Integer>(7, 1)),
-                        parts[1]);
-  }
-
-  @Test
-  public void countApproxDistinct() {
-    List<Integer> arrayData = new ArrayList<Integer>();
-    int size = 100;
-    for (int i = 0; i < 100000; i++) {
-      arrayData.add(i % size);
-    }
-    JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
-    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
-    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
-    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
-  }
-
-  @Test
-  public void countApproxDistinctByKey() {
-    double relativeSD = 0.001;
-
-    List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
-    for (int i = 10; i < 100; i++)
-      for (int j = 0; j < i; j++)
-        arrayData.add(new Tuple2<Integer, Integer>(i, j));
-
-    JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
-    List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(relativeSD).collect();
-    for (Tuple2<Integer, Object> resItem : res) {
-      double count = (double)resItem._1();
-      Long resCount = (Long)resItem._2();
-      Double error = Math.abs((resCount - count) / count);
-      Assert.assertTrue(error < relativeSD);
-    }
-
-  }
-
-  @Test
-  public void collectAsMapWithIntArrayValues() {
-    // Regression test for SPARK-1040
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
-    JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
-      @Override
-      public Tuple2<Integer, int[]> call(Integer x) throws Exception {
-        return new Tuple2<Integer, int[]>(x, new int[] { x });
-      }
-    });
-    pairRDD.collect();  // Works fine
-    Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
-  }
-}


[2/2] git commit: Migrate Java code to Scala or move it to src/main/java

Posted by ad...@apache.org.
Migrate Java code to Scala or move it to src/main/java

These classes can't be migrated:
  StorageLevels: impossible to create static fields in Scala
  JavaSparkContextVarargsWorkaround: incompatible varargs
  JavaAPISuite: should test Java APIs in pure Java (for sanity)

Author: Punya Biswal <pb...@palantir.com>

Closes #605 from punya/move-java-sources and squashes the following commits:

25b00b2 [Punya Biswal] Remove redundant type param; reformat
853da46 [Punya Biswal] Use factory method rather than constructor
e5d53d9 [Punya Biswal] Migrate Java code to Scala or move it to src/main/java


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/29ac7ea5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/29ac7ea5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/29ac7ea5

Branch: refs/heads/master
Commit: 29ac7ea52fbb0c6531e14305e2fb1ccba9678f7e
Parents: 1aa4f8a
Author: Punya Biswal <pb...@palantir.com>
Authored: Sat Feb 22 17:53:48 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sat Feb 22 17:53:48 2014 -0800

----------------------------------------------------------------------
 .../java/JavaSparkContextVarargsWorkaround.java |  63 ++
 .../apache/spark/api/java/StorageLevels.java    |  48 +
 .../main/scala/org/apache/spark/SparkFiles.java |  42 -
 .../scala/org/apache/spark/SparkFiles.scala     |  39 +
 .../java/JavaSparkContextVarargsWorkaround.java |  63 --
 .../apache/spark/api/java/StorageLevels.java    |  48 -
 .../java/function/DoubleFlatMapFunction.java    |  30 -
 .../java/function/DoubleFlatMapFunction.scala   |  30 +
 .../spark/api/java/function/DoubleFunction.java |  30 -
 .../api/java/function/DoubleFunction.scala      |  29 +
 .../spark/api/java/function/Function.java       |  35 -
 .../spark/api/java/function/Function.scala      |  31 +
 .../spark/api/java/function/Function2.java      |  35 -
 .../spark/api/java/function/Function2.scala     |  29 +
 .../spark/api/java/function/Function3.java      |  35 -
 .../spark/api/java/function/Function3.scala     |  28 +
 .../api/java/function/PairFlatMapFunction.java  |  43 -
 .../api/java/function/PairFlatMapFunction.scala |  36 +
 .../spark/api/java/function/PairFunction.java   |  41 -
 .../spark/api/java/function/PairFunction.scala  |  33 +
 .../java/org/apache/spark/JavaAPISuite.java     | 981 +++++++++++++++++++
 .../scala/org/apache/spark/JavaAPISuite.java    | 981 -------------------
 22 files changed, 1347 insertions(+), 1383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
new file mode 100644
index 0000000..2090efd
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// See
+// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
+abstract class JavaSparkContextVarargsWorkaround {
+  public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
+    if (rdds.length == 0) {
+      throw new IllegalArgumentException("Union called on empty list");
+    }
+    ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
+    for (int i = 1; i < rdds.length; i++) {
+      rest.add(rdds[i]);
+    }
+    return union(rdds[0], rest);
+  }
+
+  public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
+    if (rdds.length == 0) {
+      throw new IllegalArgumentException("Union called on empty list");
+    }
+    ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
+    for (int i = 1; i < rdds.length; i++) {
+      rest.add(rdds[i]);
+    }
+    return union(rdds[0], rest);
+  }
+
+  public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
+    if (rdds.length == 0) {
+      throw new IllegalArgumentException("Union called on empty list");
+    }
+    ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
+    for (int i = 1; i < rdds.length; i++) {
+      rest.add(rdds[i]);
+    }
+    return union(rdds[0], rest);
+  }
+
+  // These methods take separate "first" and "rest" elements to avoid having the same type erasure
+  abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
+  abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
+  abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
new file mode 100644
index 0000000..9f13b39
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import org.apache.spark.storage.StorageLevel;
+
+/**
+ * Expose some commonly useful storage level constants.
+ */
+public class StorageLevels {
+  public static final StorageLevel NONE = create(false, false, false, 1);
+  public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
+  public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
+  public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
+  public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
+  public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
+  public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
+  public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
+  public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
+  public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
+  public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
+
+  /**
+   * Create a new StorageLevel object.
+   * @param useDisk saved to disk, if true
+   * @param useMemory saved to memory, if true
+   * @param deserialized saved as deserialized objects, if true
+   * @param replication replication factor
+   */
+  public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
+    return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/SparkFiles.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.java b/core/src/main/scala/org/apache/spark/SparkFiles.java
deleted file mode 100644
index af9cf85..0000000
--- a/core/src/main/scala/org/apache/spark/SparkFiles.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.File;
-
-/**
- * Resolves paths to files added through `SparkContext.addFile()`.
- */
-public class SparkFiles {
-
-  private SparkFiles() {}
-
-  /**
-   * Get the absolute path of a file added through `SparkContext.addFile()`.
-   */
-  public static String get(String filename) {
-    return new File(getRootDirectory(), filename).getAbsolutePath();
-  }
-
-  /**
-   * Get the root directory that contains files added through `SparkContext.addFile()`.
-   */
-  public static String getRootDirectory() {
-    return SparkEnv.get().sparkFilesDir();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/SparkFiles.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.scala b/core/src/main/scala/org/apache/spark/SparkFiles.scala
new file mode 100644
index 0000000..e85b89f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkFiles.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+/**
+ * Resolves paths to files added through `SparkContext.addFile()`.
+ */
+object SparkFiles {
+
+  /**
+   * Get the absolute path of a file added through `SparkContext.addFile()`.
+   */
+  def get(filename: String): String =
+    new File(getRootDirectory(), filename).getAbsolutePath()
+
+  /**
+   * Get the root directory that contains files added through `SparkContext.addFile()`.
+   */
+  def getRootDirectory(): String =
+    SparkEnv.get.sparkFilesDir
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
deleted file mode 100644
index 2090efd..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// See
-// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
-abstract class JavaSparkContextVarargsWorkaround {
-  public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  // These methods take separate "first" and "rest" elements to avoid having the same type erasure
-  abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
-  abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
-  abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java b/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
deleted file mode 100644
index 0744269..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java;
-
-import org.apache.spark.storage.StorageLevel;
-
-/**
- * Expose some commonly useful storage level constants.
- */
-public class StorageLevels {
-  public static final StorageLevel NONE = new StorageLevel(false, false, false, 1);
-  public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1);
-  public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2);
-  public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1);
-  public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2);
-  public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1);
-  public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2);
-  public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1);
-  public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2);
-  public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1);
-  public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2);
-
-  /**
-   * Create a new StorageLevel object.
-   * @param useDisk saved to disk, if true
-   * @param useMemory saved to memory, if true
-   * @param deserialized saved as deserialized objects, if true
-   * @param replication replication factor
-   */
-  public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
-    return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
deleted file mode 100644
index 30e6a52..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-/**
- * A function that returns zero or more records of type Double from each input record.
- */
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-public abstract class DoubleFlatMapFunction<T> extends WrappedFunction1<T, Iterable<Double>>
-  implements Serializable {
-    // Intentionally left blank
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
new file mode 100644
index 0000000..7500a89
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import java.lang.{Double => JDouble, Iterable => JIterable}
+
+/**
+ * A function that returns zero or more records of type Double from each input record.
+ */
+// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
+// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
+abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
+   with Serializable {
+   // Intentionally left blank
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
deleted file mode 100644
index 490da25..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
-  implements Serializable {
-    // Intentionally left blank
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
new file mode 100644
index 0000000..2cdf2e9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import java.lang.{Double => JDouble}
+
+/**
+ * A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
+// DoubleFunction does not extend Function because some UDF functions, like map,
+// are overloaded for both Function and DoubleFunction.
+abstract class DoubleFunction[T] extends WrappedFunction1[T, JDouble] with Serializable {
+    // Intentionally left blank
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
deleted file mode 100644
index e0fcd46..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
- * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
- * when mapping RDDs of other types.
- */
-public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
-  public ClassTag<R> returnType() {
-    return ClassTag$.MODULE$.apply(Object.class);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
new file mode 100644
index 0000000..a5e1701
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.reflect.ClassTag
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * Base class for functions whose return types do not create special RDDs. PairFunction and
+ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
+ * when mapping RDDs of other types.
+ */
+abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
+  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
deleted file mode 100644
index 16d7379..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A two-argument function that takes arguments of type T1 and T2 and returns an R.
- */
-public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
-  implements Serializable {
-
-  public ClassTag<R> returnType() {
-    return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
new file mode 100644
index 0000000..fa3616c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.reflect.ClassTag
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ */
+abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
+  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
deleted file mode 100644
index 096eb71..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
- */
-public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
-        implements Serializable {
-
-    public ClassTag<R> returnType() {
-        return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
new file mode 100644
index 0000000..4515289
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import org.apache.spark.api.java.JavaSparkContext
+import scala.reflect.ClassTag
+
+/**
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+ */
+abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
+  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
deleted file mode 100644
index c72b98c..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-public abstract class PairFlatMapFunction<T, K, V>
-  extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
-  implements Serializable {
-
-  public ClassTag<K> keyType() {
-    return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
-  }
-
-  public ClassTag<V> valueType() {
-    return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
new file mode 100644
index 0000000..8467bbb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import java.lang.{Iterable => JIterable}
+import org.apache.spark.api.java.JavaSparkContext
+import scala.reflect.ClassTag
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
+// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
+// overloaded for both FlatMapFunction and PairFlatMapFunction.
+abstract class PairFlatMapFunction[T, K, V] extends WrappedFunction1[T, JIterable[(K, V)]]
+  with Serializable {
+
+  def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
+
+  def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
deleted file mode 100644
index 84b9136..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
-
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-public abstract class PairFunction<T, K, V> extends WrappedFunction1<T, Tuple2<K, V>>
-  implements Serializable {
-
-  public ClassTag<K> keyType() {
-    return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
-  }
-
-  public ClassTag<V> valueType() {
-    return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
new file mode 100644
index 0000000..d0ba0b6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.reflect.ClassTag
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
+// PairFunction does not extend Function because some UDF functions, like map,
+// are overloaded for both Function and PairFunction.
+abstract class PairFunction[T, K, V] extends WrappedFunction1[T, (K, V)] with Serializable {
+
+  def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
+
+  def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
new file mode 100644
index 0000000..20232e9
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.StatCounter;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+  private transient JavaSparkContext sc;
+
+  @Before
+  public void setUp() {
+    sc = new JavaSparkContext("local", "JavaAPISuite");
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.driver.port");
+  }
+
+  static class ReverseIntComparator implements Comparator<Integer>, Serializable {
+
+    @Override
+    public int compare(Integer a, Integer b) {
+      if (a > b) return -1;
+      else if (a < b) return 1;
+      else return 0;
+    }
+  };
+
+  @Test
+  public void sparkContextUnion() {
+    // Union of non-specialized JavaRDDs
+    List<String> strings = Arrays.asList("Hello", "World");
+    JavaRDD<String> s1 = sc.parallelize(strings);
+    JavaRDD<String> s2 = sc.parallelize(strings);
+    // Varargs
+    JavaRDD<String> sUnion = sc.union(s1, s2);
+    Assert.assertEquals(4, sUnion.count());
+    // List
+    List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>();
+    list.add(s2);
+    sUnion = sc.union(s1, list);
+    Assert.assertEquals(4, sUnion.count());
+
+    // Union of JavaDoubleRDDs
+    List<Double> doubles = Arrays.asList(1.0, 2.0);
+    JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
+    JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
+    JavaDoubleRDD dUnion = sc.union(d1, d2);
+    Assert.assertEquals(4, dUnion.count());
+
+    // Union of JavaPairRDDs
+    List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
+    pairs.add(new Tuple2<Integer, Integer>(1, 2));
+    pairs.add(new Tuple2<Integer, Integer>(3, 4));
+    JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
+    Assert.assertEquals(4, pUnion.count());
+  }
+
+  @Test
+  public void sortByKey() {
+    List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
+    pairs.add(new Tuple2<Integer, Integer>(0, 4));
+    pairs.add(new Tuple2<Integer, Integer>(3, 2));
+    pairs.add(new Tuple2<Integer, Integer>(-1, 1));
+
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+
+    // Default comparator
+    JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
+    Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+    List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
+    Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
+    Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
+
+    // Custom comparator
+    sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false);
+    Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+    sortedPairs = sortedRDD.collect();
+    Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
+    Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
+  }
+
+  static int foreachCalls = 0;
+
+  @Test
+  public void foreach() {
+    foreachCalls = 0;
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String s) {
+        foreachCalls++;
+      }
+    });
+    Assert.assertEquals(2, foreachCalls);
+  }
+
+  @Test
+  public void lookup() {
+    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<String, String>("Apples", "Fruit"),
+      new Tuple2<String, String>("Oranges", "Fruit"),
+      new Tuple2<String, String>("Oranges", "Citrus")
+      ));
+    Assert.assertEquals(2, categories.lookup("Oranges").size());
+    Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
+  }
+
+  @Test
+  public void groupBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
+      @Override
+      public Boolean call(Integer x) {
+        return x % 2 == 0;
+      }
+    };
+    JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
+    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+
+    oddsAndEvens = rdd.groupBy(isOdd, 1);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
+    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+  }
+
+  @Test
+  public void cogroup() {
+    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<String, String>("Apples", "Fruit"),
+      new Tuple2<String, String>("Oranges", "Fruit"),
+      new Tuple2<String, String>("Oranges", "Citrus")
+      ));
+    JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<String, Integer>("Oranges", 2),
+      new Tuple2<String, Integer>("Apples", 3)
+    ));
+    JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
+    Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
+    Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
+
+    cogrouped.collect();
+  }
+
+  @Test
+  public void leftOuterJoin() {
+    JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(1, 2),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(3, 1)
+      ));
+    JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<Integer, Character>(1, 'x'),
+      new Tuple2<Integer, Character>(2, 'y'),
+      new Tuple2<Integer, Character>(2, 'z'),
+      new Tuple2<Integer, Character>(4, 'w')
+    ));
+    List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
+      rdd1.leftOuterJoin(rdd2).collect();
+    Assert.assertEquals(5, joined.size());
+    Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
+      rdd1.leftOuterJoin(rdd2).filter(
+        new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
+          @Override
+          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
+            throws Exception {
+            return !tup._2()._2().isPresent();
+          }
+      }).first();
+    Assert.assertEquals(3, firstUnmatched._1().intValue());
+  }
+
+  @Test
+  public void foldReduce() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    };
+
+    int sum = rdd.fold(0, add);
+    Assert.assertEquals(33, sum);
+
+    sum = rdd.reduce(add);
+    Assert.assertEquals(33, sum);
+  }
+
+  @Test
+  public void foldByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(3, 2),
+      new Tuple2<Integer, Integer>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
+      new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer a, Integer b) {
+          return a + b;
+        }
+    });
+    Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+    Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+    Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+  }
+
+  @Test
+  public void reduceByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(3, 2),
+      new Tuple2<Integer, Integer>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
+      new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer a, Integer b) {
+         return a + b;
+        }
+    });
+    Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+    Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+    Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+    Map<Integer, Integer> localCounts = counts.collectAsMap();
+    Assert.assertEquals(1, localCounts.get(1).intValue());
+    Assert.assertEquals(2, localCounts.get(2).intValue());
+    Assert.assertEquals(3, localCounts.get(3).intValue());
+
+   localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
+      Integer>() {
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    });
+    Assert.assertEquals(1, localCounts.get(1).intValue());
+    Assert.assertEquals(2, localCounts.get(2).intValue());
+    Assert.assertEquals(3, localCounts.get(3).intValue());
+  }
+
+  @Test
+  public void approximateResults() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Map<Integer, Long> countsByValue = rdd.countByValue();
+    Assert.assertEquals(2, countsByValue.get(1).longValue());
+    Assert.assertEquals(1, countsByValue.get(13).longValue());
+
+    PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
+    Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
+    Assert.assertEquals(2.0, finalValue.get(1).mean(), 0.01);
+    Assert.assertEquals(1.0, finalValue.get(13).mean(), 0.01);
+  }
+
+  @Test
+  public void take() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Assert.assertEquals(1, rdd.first().intValue());
+    List<Integer> firstTwo = rdd.take(2);
+    List<Integer> sample = rdd.takeSample(false, 2, 42);
+  }
+
+  @Test
+  public void cartesian() {
+    JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+    JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
+    JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
+    Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first());
+  }
+
+  @Test
+  public void javaDoubleRDD() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+    JavaDoubleRDD distinct = rdd.distinct();
+    Assert.assertEquals(5, distinct.count());
+    JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
+      @Override
+      public Boolean call(Double x) {
+        return x > 2.0;
+      }
+    });
+    Assert.assertEquals(3, filter.count());
+    JavaDoubleRDD union = rdd.union(rdd);
+    Assert.assertEquals(12, union.count());
+    union = union.cache();
+    Assert.assertEquals(12, union.count());
+
+    Assert.assertEquals(20, rdd.sum(), 0.01);
+    StatCounter stats = rdd.stats();
+    Assert.assertEquals(20, stats.sum(), 0.01);
+    Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
+    Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
+    Assert.assertEquals(6.22222, rdd.variance(), 0.01);
+    Assert.assertEquals(7.46667, rdd.sampleVariance(), 0.01);
+    Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
+    Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
+
+    Double first = rdd.first();
+    List<Double> take = rdd.take(5);
+  }
+
+  @Test
+  public void javaDoubleRDDHistoGram() {
+   JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+   // Test using generated buckets
+   Tuple2<double[], long[]> results = rdd.histogram(2);
+   double[] expected_buckets = {1.0, 2.5, 4.0};
+   long[] expected_counts = {2, 2};
+   Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
+   Assert.assertArrayEquals(expected_counts, results._2);
+   // Test with provided buckets
+   long[] histogram = rdd.histogram(expected_buckets);
+   Assert.assertArrayEquals(expected_counts, histogram);
+  }
+
+  @Test
+  public void map() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+      @Override
+      public Double call(Integer x) {
+        return 1.0 * x;
+      }
+    }).cache();
+    doubles.collect();
+    JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Integer x) {
+        return new Tuple2<Integer, Integer>(x, x);
+      }
+    }).cache();
+    pairs.collect();
+    JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
+      @Override
+      public String call(Integer x) {
+        return x.toString();
+      }
+    }).cache();
+    strings.collect();
+  }
+
+  @Test
+  public void flatMap() {
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+      "The quick brown fox jumps over the lazy dog."));
+    JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterable<String> call(String x) {
+        return Arrays.asList(x.split(" "));
+      }
+    });
+    Assert.assertEquals("Hello", words.first());
+    Assert.assertEquals(11, words.count());
+
+    JavaPairRDD<String, String> pairs = rdd.flatMap(
+      new PairFlatMapFunction<String, String, String>() {
+
+        @Override
+        public Iterable<Tuple2<String, String>> call(String s) {
+          List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
+          for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
+          return pairs;
+        }
+      }
+    );
+    Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+    Assert.assertEquals(11, pairs.count());
+
+    JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
+      @Override
+      public Iterable<Double> call(String s) {
+        List<Double> lengths = new LinkedList<Double>();
+        for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+        return lengths;
+      }
+    });
+    Double x = doubles.first();
+    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+    Assert.assertEquals(11, pairs.count());
+  }
+
+  @Test
+  public void mapsFromPairsToPairs() {
+      List<Tuple2<Integer, String>> pairs = Arrays.asList(
+              new Tuple2<Integer, String>(1, "a"),
+              new Tuple2<Integer, String>(2, "aa"),
+              new Tuple2<Integer, String>(3, "aaa")
+      );
+      JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+      // Regression test for SPARK-668:
+      JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+          new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
+          @Override
+          public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
+              return Collections.singletonList(item.swap());
+          }
+      });
+      swapped.collect();
+
+      // There was never a bug here, but it's worth testing:
+      pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+          @Override
+          public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
+              return item.swap();
+          }
+      }).collect();
+  }
+
+  @Test
+  public void mapPartitions() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    JavaRDD<Integer> partitionSums = rdd.mapPartitions(
+      new FlatMapFunction<Iterator<Integer>, Integer>() {
+        @Override
+        public Iterable<Integer> call(Iterator<Integer> iter) {
+          int sum = 0;
+          while (iter.hasNext()) {
+            sum += iter.next();
+          }
+          return Collections.singletonList(sum);
+        }
+    });
+    Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+  }
+
+  @Test
+  public void repartition() {
+    // Shrinking number of partitions
+    JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
+    JavaRDD<Integer> repartitioned1 = in1.repartition(4);
+    List<List<Integer>> result1 = repartitioned1.glom().collect();
+    Assert.assertEquals(4, result1.size());
+    for (List<Integer> l: result1) {
+      Assert.assertTrue(l.size() > 0);
+    }
+
+    // Growing number of partitions
+    JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
+    JavaRDD<Integer> repartitioned2 = in2.repartition(2);
+    List<List<Integer>> result2 = repartitioned2.glom().collect();
+    Assert.assertEquals(2, result2.size());
+    for (List<Integer> l: result2) {
+      Assert.assertTrue(l.size() > 0);
+    }
+  }
+
+  @Test
+  public void persist() {
+    JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+    doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
+    Assert.assertEquals(20, doubleRDD.sum(), 0.1);
+
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+    pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
+    Assert.assertEquals("a", pairRDD.first()._2());
+
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    rdd = rdd.persist(StorageLevel.DISK_ONLY());
+    Assert.assertEquals(1, rdd.first().intValue());
+  }
+
+  @Test
+  public void iterator() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
+    TaskContext context = new TaskContext(0, 0, 0, false, false, null);
+    Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
+  }
+
+  @Test
+  public void glom() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    Assert.assertEquals("[1, 2]", rdd.glom().first().toString());
+  }
+
+  // File input / output tests are largely adapted from FileSuite:
+
+  @Test
+  public void textFiles() throws IOException {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    rdd.saveAsTextFile(outputDir);
+    // Read the plain text file and check it's OK
+    File outputFile = new File(outputDir, "part-00000");
+    String content = Files.toString(outputFile, Charsets.UTF_8);
+    Assert.assertEquals("1\n2\n3\n4\n", content);
+    // Also try reading it in as a text file RDD
+    List<String> expected = Arrays.asList("1", "2", "3", "4");
+    JavaRDD<String> readRDD = sc.textFile(outputDir);
+    Assert.assertEquals(expected, readRDD.collect());
+  }
+
+  @Test
+  public void textFilesCompressed() throws IOException {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    rdd.saveAsTextFile(outputDir, DefaultCodec.class);
+
+    // Try reading it in as a text file RDD
+    List<String> expected = Arrays.asList("1", "2", "3", "4");
+    JavaRDD<String> readRDD = sc.textFile(outputDir);
+    Assert.assertEquals(expected, readRDD.collect());
+  }
+
+  @Test
+  public void sequenceFile() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    // Try reading the output back as an object file
+    JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
+      Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
+      @Override
+      public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
+        return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
+      }
+    });
+    Assert.assertEquals(pairs, readRDD.collect());
+  }
+
+  @Test
+  public void writeWithNewAPIHadoopFile() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
+      org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class,
+      Text.class);
+    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+      String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @Test
+  public void readWithNewAPIHadoopFile() throws IOException {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
+      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class,
+      Text.class, new Job().getConfiguration());
+    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+      String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @Test
+  public void objectFilesOfInts() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    rdd.saveAsObjectFile(outputDir);
+    // Try reading the output back as an object file
+    List<Integer> expected = Arrays.asList(1, 2, 3, 4);
+    JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
+    Assert.assertEquals(expected, readRDD.collect());
+  }
+
+  @Test
+  public void objectFilesOfComplexTypes() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+    rdd.saveAsObjectFile(outputDir);
+    // Try reading the output back as an object file
+    JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
+    Assert.assertEquals(pairs, readRDD.collect());
+  }
+
+  @Test
+  public void hadoopFile() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+      SequenceFileInputFormat.class, IntWritable.class, Text.class);
+    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+      String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @Test
+  public void hadoopFileCompressed() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+        new Tuple2<Integer, String>(1, "a"),
+        new Tuple2<Integer, String>(2, "aa"),
+        new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
+        DefaultCodec.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+        SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+        String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @Test
+  public void zip() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+      @Override
+      public Double call(Integer x) {
+        return 1.0 * x;
+      }
+    });
+    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+    zipped.count();
+  }
+
+  @Test
+  public void zipPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+    FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+      new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
+        @Override
+        public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
+          int sizeI = 0;
+          int sizeS = 0;
+          while (i.hasNext()) {
+            sizeI += 1;
+            i.next();
+          }
+          while (s.hasNext()) {
+            sizeS += 1;
+            s.next();
+          }
+          return Arrays.asList(sizeI, sizeS);
+        }
+      };
+
+    JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+    Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+  }
+
+  @Test
+  public void accumulators() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+    final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+    rdd.foreach(new VoidFunction<Integer>() {
+      public void call(Integer x) {
+        intAccum.add(x);
+      }
+    });
+    Assert.assertEquals((Integer) 25, intAccum.value());
+
+    final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+    rdd.foreach(new VoidFunction<Integer>() {
+      public void call(Integer x) {
+        doubleAccum.add((double) x);
+      }
+    });
+    Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+    // Try a custom accumulator type
+    AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+      public Float addInPlace(Float r, Float t) {
+        return r + t;
+      }
+
+      public Float addAccumulator(Float r, Float t) {
+        return r + t;
+      }
+
+      public Float zero(Float initialValue) {
+        return 0.0f;
+      }
+    };
+
+    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+    rdd.foreach(new VoidFunction<Integer>() {
+      public void call(Integer x) {
+        floatAccum.add((float) x);
+      }
+    });
+    Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+    // Test the setValue method
+    floatAccum.setValue(5.0f);
+    Assert.assertEquals((Float) 5.0f, floatAccum.value());
+  }
+
+  @Test
+  public void keyBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+    List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
+      public String call(Integer t) throws Exception {
+        return t.toString();
+      }
+    }).collect();
+    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+  }
+
+  @Test
+  public void checkpointAndComputation() {
+    File tempDir = Files.createTempDir();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
+    Assert.assertEquals(false, rdd.isCheckpointed());
+    rdd.checkpoint();
+    rdd.count(); // Forces the DAG to cause a checkpoint
+    Assert.assertEquals(true, rdd.isCheckpointed());
+    Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
+  }
+
+  @Test
+  public void checkpointAndRestore() {
+    File tempDir = Files.createTempDir();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
+    Assert.assertEquals(false, rdd.isCheckpointed());
+    rdd.checkpoint();
+    rdd.count(); // Forces the DAG to cause a checkpoint
+    Assert.assertEquals(true, rdd.isCheckpointed());
+
+    Assert.assertTrue(rdd.getCheckpointFile().isPresent());
+    JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
+    Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
+  }
+
+  @Test
+  public void mapOnPairRDD() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+        return new Tuple2<Integer, Integer>(i, i % 2);
+      }
+    });
+    JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+        new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+        return new Tuple2<Integer, Integer>(in._2(), in._1());
+      }
+    });
+    Assert.assertEquals(Arrays.asList(
+        new Tuple2<Integer, Integer>(1, 1),
+        new Tuple2<Integer, Integer>(0, 2),
+        new Tuple2<Integer, Integer>(1, 3),
+        new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+
+  }
+
+  @Test
+  public void collectPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+        return new Tuple2<Integer, Integer>(i, i % 2);
+      }
+    });
+
+    List[] parts = rdd1.collectPartitions(new int[] {0});
+    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+    parts = rdd1.collectPartitions(new int[] {1, 2});
+    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+                                      new Tuple2<Integer, Integer>(2, 0)),
+                        rdd2.collectPartitions(new int[] {0})[0]);
+
+    parts = rdd2.collectPartitions(new int[] {1, 2});
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+                                      new Tuple2<Integer, Integer>(4, 0)),
+                        parts[0]);
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+                                      new Tuple2<Integer, Integer>(6, 0),
+                                      new Tuple2<Integer, Integer>(7, 1)),
+                        parts[1]);
+  }
+
+  @Test
+  public void countApproxDistinct() {
+    List<Integer> arrayData = new ArrayList<Integer>();
+    int size = 100;
+    for (int i = 0; i < 100000; i++) {
+      arrayData.add(i % size);
+    }
+    JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
+    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
+    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
+    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
+  }
+
+  @Test
+  public void countApproxDistinctByKey() {
+    double relativeSD = 0.001;
+
+    List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
+    for (int i = 10; i < 100; i++)
+      for (int j = 0; j < i; j++)
+        arrayData.add(new Tuple2<Integer, Integer>(i, j));
+
+    JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
+    List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(relativeSD).collect();
+    for (Tuple2<Integer, Object> resItem : res) {
+      double count = (double)resItem._1();
+      Long resCount = (Long)resItem._2();
+      Double error = Math.abs((resCount - count) / count);
+      Assert.assertTrue(error < relativeSD);
+    }
+
+  }
+
+  @Test
+  public void collectAsMapWithIntArrayValues() {
+    // Regression test for SPARK-1040
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+      @Override
+      public Tuple2<Integer, int[]> call(Integer x) throws Exception {
+        return new Tuple2<Integer, int[]>(x, new int[] { x });
+      }
+    });
+    pairRDD.collect();  // Works fine
+    Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
+  }
+}