You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/16 12:32:57 UTC

[6/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
new file mode 100644
index 0000000..80aab10
--- /dev/null
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -0,0 +1,1842 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark;
+
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.Tuple4;
+import scala.collection.JavaConverters;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.base.Throwables;
+import com.google.common.io.Files;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.input.PortableDataStream;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.LongAccumulator;
+import org.apache.spark.util.StatCounter;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+  private transient JavaSparkContext sc;
+  private transient File tempDir;
+
+  @Before
+  public void setUp() {
+    sc = new JavaSparkContext("local", "JavaAPISuite");
+    tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void sparkContextUnion() {
+    // Union of non-specialized JavaRDDs
+    List<String> strings = Arrays.asList("Hello", "World");
+    JavaRDD<String> s1 = sc.parallelize(strings);
+    JavaRDD<String> s2 = sc.parallelize(strings);
+    // Varargs
+    JavaRDD<String> sUnion = sc.union(s1, s2);
+    assertEquals(4, sUnion.count());
+    // List
+    List<JavaRDD<String>> list = new ArrayList<>();
+    list.add(s2);
+    sUnion = sc.union(s1, list);
+    assertEquals(4, sUnion.count());
+
+    // Union of JavaDoubleRDDs
+    List<Double> doubles = Arrays.asList(1.0, 2.0);
+    JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
+    JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
+    JavaDoubleRDD dUnion = sc.union(d1, d2);
+    assertEquals(4, dUnion.count());
+
+    // Union of JavaPairRDDs
+    List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+    pairs.add(new Tuple2<>(1, 2));
+    pairs.add(new Tuple2<>(3, 4));
+    JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
+    assertEquals(4, pUnion.count());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void intersection() {
+    List<Integer> ints1 = Arrays.asList(1, 10, 2, 3, 4, 5);
+    List<Integer> ints2 = Arrays.asList(1, 6, 2, 3, 7, 8);
+    JavaRDD<Integer> s1 = sc.parallelize(ints1);
+    JavaRDD<Integer> s2 = sc.parallelize(ints2);
+
+    JavaRDD<Integer> intersections = s1.intersection(s2);
+    assertEquals(3, intersections.count());
+
+    JavaRDD<Integer> empty = sc.emptyRDD();
+    JavaRDD<Integer> emptyIntersection = empty.intersection(s2);
+    assertEquals(0, emptyIntersection.count());
+
+    List<Double> doubles = Arrays.asList(1.0, 2.0);
+    JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
+    JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
+    JavaDoubleRDD dIntersection = d1.intersection(d2);
+    assertEquals(2, dIntersection.count());
+
+    List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+    pairs.add(new Tuple2<>(1, 2));
+    pairs.add(new Tuple2<>(3, 4));
+    JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> pIntersection = p1.intersection(p2);
+    assertEquals(2, pIntersection.count());
+  }
+
+  @Test
+  public void sample() {
+    List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    JavaRDD<Integer> rdd = sc.parallelize(ints);
+    // the seeds here are "magic" to make this work out nicely
+    JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 8);
+    assertEquals(2, sample20.count());
+    JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 2);
+    assertEquals(2, sample20WithoutReplacement.count());
+  }
+
+  @Test
+  public void randomSplit() {
+    List<Integer> ints = new ArrayList<>(1000);
+    for (int i = 0; i < 1000; i++) {
+      ints.add(i);
+    }
+    JavaRDD<Integer> rdd = sc.parallelize(ints);
+    JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 31);
+    // the splits aren't perfect -- not enough data for them to be -- just check they're about right
+    assertEquals(3, splits.length);
+    long s0 = splits[0].count();
+    long s1 = splits[1].count();
+    long s2 = splits[2].count();
+    assertTrue(s0 + " not within expected range", s0 > 150 && s0 < 250);
+    assertTrue(s1 + " not within expected range", s1 > 250 && s0 < 350);
+    assertTrue(s2 + " not within expected range", s2 > 430 && s2 < 570);
+  }
+
+  @Test
+  public void sortByKey() {
+    List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+    pairs.add(new Tuple2<>(0, 4));
+    pairs.add(new Tuple2<>(3, 2));
+    pairs.add(new Tuple2<>(-1, 1));
+
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+
+    // Default comparator
+    JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
+    assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
+    List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
+    assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
+    assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
+
+    // Custom comparator
+    sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false);
+    assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
+    sortedPairs = sortedRDD.collect();
+    assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
+    assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void repartitionAndSortWithinPartitions() {
+    List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+    pairs.add(new Tuple2<>(0, 5));
+    pairs.add(new Tuple2<>(3, 8));
+    pairs.add(new Tuple2<>(2, 6));
+    pairs.add(new Tuple2<>(0, 8));
+    pairs.add(new Tuple2<>(3, 8));
+    pairs.add(new Tuple2<>(1, 3));
+
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+
+    Partitioner partitioner = new Partitioner() {
+      @Override
+      public int numPartitions() {
+        return 2;
+      }
+      @Override
+      public int getPartition(Object key) {
+        return (Integer) key % 2;
+      }
+    };
+
+    JavaPairRDD<Integer, Integer> repartitioned =
+        rdd.repartitionAndSortWithinPartitions(partitioner);
+    assertTrue(repartitioned.partitioner().isPresent());
+    assertEquals(repartitioned.partitioner().get(), partitioner);
+    List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect();
+    assertEquals(partitions.get(0),
+        Arrays.asList(new Tuple2<>(0, 5), new Tuple2<>(0, 8), new Tuple2<>(2, 6)));
+    assertEquals(partitions.get(1),
+        Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8)));
+  }
+
+  @Test
+  public void emptyRDD() {
+    JavaRDD<String> rdd = sc.emptyRDD();
+    assertEquals("Empty RDD shouldn't have any values", 0, rdd.count());
+  }
+
+  @Test
+  public void sortBy() {
+    List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+    pairs.add(new Tuple2<>(0, 4));
+    pairs.add(new Tuple2<>(3, 2));
+    pairs.add(new Tuple2<>(-1, 1));
+
+    JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
+
+    // compare on first value
+    JavaRDD<Tuple2<Integer, Integer>> sortedRDD =
+        rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
+      @Override
+      public Integer call(Tuple2<Integer, Integer> t) {
+        return t._1();
+      }
+    }, true, 2);
+
+    assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
+    List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
+    assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
+    assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
+
+    // compare on second value
+    sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
+      @Override
+      public Integer call(Tuple2<Integer, Integer> t) {
+        return t._2();
+      }
+    }, true, 2);
+    assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
+    sortedPairs = sortedRDD.collect();
+    assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
+    assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
+  }
+
+  @Test
+  public void foreach() {
+    final LongAccumulator accum = sc.sc().longAccumulator();
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String s) {
+        accum.add(1);
+      }
+    });
+    assertEquals(2, accum.value().intValue());
+  }
+
+  @Test
+  public void foreachPartition() {
+    final LongAccumulator accum = sc.sc().longAccumulator();
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
+      @Override
+      public void call(Iterator<String> iter) {
+        while (iter.hasNext()) {
+          iter.next();
+          accum.add(1);
+        }
+      }
+    });
+    assertEquals(2, accum.value().intValue());
+  }
+
+  @Test
+  public void toLocalIterator() {
+    List<Integer> correct = Arrays.asList(1, 2, 3, 4);
+    JavaRDD<Integer> rdd = sc.parallelize(correct);
+    List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
+    assertEquals(correct, result);
+  }
+
+  @Test
+  public void zipWithUniqueId() {
+    List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
+    JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
+    JavaRDD<Long> indexes = zip.values();
+    assertEquals(4, new HashSet<>(indexes.collect()).size());
+  }
+
+  @Test
+  public void zipWithIndex() {
+    List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
+    JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
+    JavaRDD<Long> indexes = zip.values();
+    List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
+    assertEquals(correctIndexes, indexes.collect());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void lookup() {
+    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Apples", "Fruit"),
+      new Tuple2<>("Oranges", "Fruit"),
+      new Tuple2<>("Oranges", "Citrus")
+    ));
+    assertEquals(2, categories.lookup("Oranges").size());
+    assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
+  }
+
+  @Test
+  public void groupBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
+      @Override
+      public Boolean call(Integer x) {
+        return x % 2 == 0;
+      }
+    };
+    JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+    assertEquals(2, oddsAndEvens.count());
+    assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+
+    oddsAndEvens = rdd.groupBy(isOdd, 1);
+    assertEquals(2, oddsAndEvens.count());
+    assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+  }
+
+  @Test
+  public void groupByOnPairRDD() {
+    // Regression test for SPARK-4459
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Tuple2<Integer, Integer>, Boolean> areOdd =
+      new Function<Tuple2<Integer, Integer>, Boolean>() {
+        @Override
+        public Boolean call(Tuple2<Integer, Integer> x) {
+          return (x._1() % 2 == 0) && (x._2() % 2 == 0);
+        }
+      };
+    JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
+    JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
+    assertEquals(2, oddsAndEvens.count());
+    assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+
+    oddsAndEvens = pairRDD.groupBy(areOdd, 1);
+    assertEquals(2, oddsAndEvens.count());
+    assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void keyByOnPairRDD() {
+    // Regression test for SPARK-4459
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Tuple2<Integer, Integer>, String> sumToString =
+      new Function<Tuple2<Integer, Integer>, String>() {
+        @Override
+        public String call(Tuple2<Integer, Integer> x) {
+          return String.valueOf(x._1() + x._2());
+        }
+      };
+    JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
+    JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
+    assertEquals(7, keyed.count());
+    assertEquals(1, (long) keyed.lookup("2").get(0)._1());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void cogroup() {
+    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Apples", "Fruit"),
+      new Tuple2<>("Oranges", "Fruit"),
+      new Tuple2<>("Oranges", "Citrus")
+      ));
+    JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Oranges", 2),
+      new Tuple2<>("Apples", 3)
+    ));
+    JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped =
+        categories.cogroup(prices);
+    assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+    assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
+
+    cogrouped.collect();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void cogroup3() {
+    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Apples", "Fruit"),
+      new Tuple2<>("Oranges", "Fruit"),
+      new Tuple2<>("Oranges", "Citrus")
+      ));
+    JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Oranges", 2),
+      new Tuple2<>("Apples", 3)
+    ));
+    JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Oranges", 21),
+      new Tuple2<>("Apples", 42)
+    ));
+
+    JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
+        categories.cogroup(prices, quantities);
+    assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+    assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
+    assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
+
+
+    cogrouped.collect();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void cogroup4() {
+    JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Apples", "Fruit"),
+      new Tuple2<>("Oranges", "Fruit"),
+      new Tuple2<>("Oranges", "Citrus")
+      ));
+    JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Oranges", 2),
+      new Tuple2<>("Apples", 3)
+    ));
+    JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Oranges", 21),
+      new Tuple2<>("Apples", 42)
+    ));
+    JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>("Oranges", "BR"),
+      new Tuple2<>("Apples", "US")
+    ));
+
+    JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>,
+        Iterable<String>>> cogrouped = categories.cogroup(prices, quantities, countries);
+    assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+    assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
+    assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
+    assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));
+
+    cogrouped.collect();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void leftOuterJoin() {
+    JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>(1, 1),
+      new Tuple2<>(1, 2),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(3, 1)
+      ));
+    JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<>(1, 'x'),
+      new Tuple2<>(2, 'y'),
+      new Tuple2<>(2, 'z'),
+      new Tuple2<>(4, 'w')
+    ));
+    List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
+      rdd1.leftOuterJoin(rdd2).collect();
+    assertEquals(5, joined.size());
+    Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
+      rdd1.leftOuterJoin(rdd2).filter(
+        new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
+          @Override
+          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) {
+            return !tup._2()._2().isPresent();
+          }
+      }).first();
+    assertEquals(3, firstUnmatched._1().intValue());
+  }
+
+  @Test
+  public void foldReduce() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    };
+
+    int sum = rdd.fold(0, add);
+    assertEquals(33, sum);
+
+    sum = rdd.reduce(add);
+    assertEquals(33, sum);
+  }
+
+  @Test
+  public void treeReduce() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
+    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    };
+    for (int depth = 1; depth <= 10; depth++) {
+      int sum = rdd.treeReduce(add, depth);
+      assertEquals(-5, sum);
+    }
+  }
+
+  @Test
+  public void treeAggregate() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
+    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    };
+    for (int depth = 1; depth <= 10; depth++) {
+      int sum = rdd.treeAggregate(0, add, add, depth);
+      assertEquals(-5, sum);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void aggregateByKey() {
+    JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs(
+      Arrays.asList(
+        new Tuple2<>(1, 1),
+        new Tuple2<>(1, 1),
+        new Tuple2<>(3, 2),
+        new Tuple2<>(5, 1),
+        new Tuple2<>(5, 3)), 2);
+
+    Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
+      new Function2<Set<Integer>, Integer, Set<Integer>>() {
+        @Override
+        public Set<Integer> call(Set<Integer> a, Integer b) {
+          a.add(b);
+          return a;
+        }
+      },
+      new Function2<Set<Integer>, Set<Integer>, Set<Integer>>() {
+        @Override
+        public Set<Integer> call(Set<Integer> a, Set<Integer> b) {
+          a.addAll(b);
+          return a;
+        }
+      }).collectAsMap();
+    assertEquals(3, sets.size());
+    assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
+    assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
+    assertEquals(new HashSet<>(Arrays.asList(1, 3)), sets.get(5));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void foldByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<>(2, 1),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(1, 1),
+      new Tuple2<>(3, 2),
+      new Tuple2<>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
+      new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer a, Integer b) {
+          return a + b;
+        }
+    });
+    assertEquals(1, sums.lookup(1).get(0).intValue());
+    assertEquals(2, sums.lookup(2).get(0).intValue());
+    assertEquals(3, sums.lookup(3).get(0).intValue());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void reduceByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<>(2, 1),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(1, 1),
+      new Tuple2<>(3, 2),
+      new Tuple2<>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
+      new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer a, Integer b) {
+         return a + b;
+        }
+    });
+    assertEquals(1, counts.lookup(1).get(0).intValue());
+    assertEquals(2, counts.lookup(2).get(0).intValue());
+    assertEquals(3, counts.lookup(3).get(0).intValue());
+
+    Map<Integer, Integer> localCounts = counts.collectAsMap();
+    assertEquals(1, localCounts.get(1).intValue());
+    assertEquals(2, localCounts.get(2).intValue());
+    assertEquals(3, localCounts.get(3).intValue());
+
+    localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() {
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    });
+    assertEquals(1, localCounts.get(1).intValue());
+    assertEquals(2, localCounts.get(2).intValue());
+    assertEquals(3, localCounts.get(3).intValue());
+  }
+
+  @Test
+  public void approximateResults() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Map<Integer, Long> countsByValue = rdd.countByValue();
+    assertEquals(2, countsByValue.get(1).longValue());
+    assertEquals(1, countsByValue.get(13).longValue());
+
+    PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
+    Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
+    assertEquals(2.0, finalValue.get(1).mean(), 0.01);
+    assertEquals(1.0, finalValue.get(13).mean(), 0.01);
+  }
+
+  @Test
+  public void take() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    assertEquals(1, rdd.first().intValue());
+    rdd.take(2);
+    rdd.takeSample(false, 2, 42);
+  }
+
+  @Test
+  public void isEmpty() {
+    assertTrue(sc.emptyRDD().isEmpty());
+    assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
+    assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
+    assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
+        new Function<Integer,Boolean>() {
+          @Override
+          public Boolean call(Integer i) {
+            return i < 0;
+          }
+        }).isEmpty());
+    assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
+        new Function<Integer, Boolean>() {
+          @Override
+          public Boolean call(Integer i) {
+            return i > 1;
+          }
+        }).isEmpty());
+  }
+
+  @Test
+  public void cartesian() {
+    JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+    JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
+    JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
+    assertEquals(new Tuple2<>("Hello", 1.0), cartesian.first());
+  }
+
+  @Test
+  public void javaDoubleRDD() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+    JavaDoubleRDD distinct = rdd.distinct();
+    assertEquals(5, distinct.count());
+    JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
+      @Override
+      public Boolean call(Double x) {
+        return x > 2.0;
+      }
+    });
+    assertEquals(3, filter.count());
+    JavaDoubleRDD union = rdd.union(rdd);
+    assertEquals(12, union.count());
+    union = union.cache();
+    assertEquals(12, union.count());
+
+    assertEquals(20, rdd.sum(), 0.01);
+    StatCounter stats = rdd.stats();
+    assertEquals(20, stats.sum(), 0.01);
+    assertEquals(20/6.0, rdd.mean(), 0.01);
+    assertEquals(20/6.0, rdd.mean(), 0.01);
+    assertEquals(6.22222, rdd.variance(), 0.01);
+    assertEquals(rdd.variance(), rdd.popVariance(), 1e-14);
+    assertEquals(7.46667, rdd.sampleVariance(), 0.01);
+    assertEquals(2.49444, rdd.stdev(), 0.01);
+    assertEquals(rdd.stdev(), rdd.popStdev(), 1e-14);
+    assertEquals(2.73252, rdd.sampleStdev(), 0.01);
+
+    rdd.first();
+    rdd.take(5);
+  }
+
+  @Test
+  public void javaDoubleRDDHistoGram() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    // Test using generated buckets
+    Tuple2<double[], long[]> results = rdd.histogram(2);
+    double[] expected_buckets = {1.0, 2.5, 4.0};
+    long[] expected_counts = {2, 2};
+    assertArrayEquals(expected_buckets, results._1(), 0.1);
+    assertArrayEquals(expected_counts, results._2());
+    // Test with provided buckets
+    long[] histogram = rdd.histogram(expected_buckets);
+    assertArrayEquals(expected_counts, histogram);
+    // SPARK-5744
+    assertArrayEquals(
+        new long[] {0},
+        sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
+  }
+
+  private static class DoubleComparator implements Comparator<Double>, Serializable {
+    @Override
+    public int compare(Double o1, Double o2) {
+      return o1.compareTo(o2);
+    }
+  }
+
+  @Test
+  public void max() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double max = rdd.max(new DoubleComparator());
+    assertEquals(4.0, max, 0.001);
+  }
+
+  @Test
+  public void min() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double max = rdd.min(new DoubleComparator());
+    assertEquals(1.0, max, 0.001);
+  }
+
+  @Test
+  public void naturalMax() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double max = rdd.max();
+    assertEquals(4.0, max, 0.0);
+  }
+
+  @Test
+  public void naturalMin() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double max = rdd.min();
+    assertEquals(1.0, max, 0.0);
+  }
+
+  @Test
+  public void takeOrdered() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2, new DoubleComparator()));
+    assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2));
+  }
+
+  @Test
+  public void top() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    List<Integer> top2 = rdd.top(2);
+    assertEquals(Arrays.asList(4, 3), top2);
+  }
+
+  private static class AddInts implements Function2<Integer, Integer, Integer> {
+    @Override
+    public Integer call(Integer a, Integer b) {
+      return a + b;
+    }
+  }
+
+  @Test
+  public void reduce() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    int sum = rdd.reduce(new AddInts());
+    assertEquals(10, sum);
+  }
+
+  @Test
+  public void reduceOnJavaDoubleRDD() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double sum = rdd.reduce(new Function2<Double, Double, Double>() {
+      @Override
+      public Double call(Double v1, Double v2) {
+        return v1 + v2;
+      }
+    });
+    assertEquals(10.0, sum, 0.001);
+  }
+
+  @Test
+  public void fold() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    int sum = rdd.fold(0, new AddInts());
+    assertEquals(10, sum);
+  }
+
+  @Test
+  public void aggregate() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    int sum = rdd.aggregate(0, new AddInts(), new AddInts());
+    assertEquals(10, sum);
+  }
+
+  @Test
+  public void map() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
+      @Override
+      public double call(Integer x) {
+        return x.doubleValue();
+      }
+    }).cache();
+    doubles.collect();
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer x) {
+            return new Tuple2<>(x, x);
+          }
+        }).cache();
+    pairs.collect();
+    JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
+      @Override
+      public String call(Integer x) {
+        return x.toString();
+      }
+    }).cache();
+    strings.collect();
+  }
+
+  @Test
+  public void flatMap() {
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+      "The quick brown fox jumps over the lazy dog."));
+    JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterator<String> call(String x) {
+        return Arrays.asList(x.split(" ")).iterator();
+      }
+    });
+    assertEquals("Hello", words.first());
+    assertEquals(11, words.count());
+
+    JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
+      new PairFlatMapFunction<String, String, String>() {
+        @Override
+        public Iterator<Tuple2<String, String>> call(String s) {
+          List<Tuple2<String, String>> pairs = new LinkedList<>();
+          for (String word : s.split(" ")) {
+            pairs.add(new Tuple2<>(word, word));
+          }
+          return pairs.iterator();
+        }
+      }
+    );
+    assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first());
+    assertEquals(11, pairsRDD.count());
+
+    JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
+      @Override
+      public Iterator<Double> call(String s) {
+        List<Double> lengths = new LinkedList<>();
+        for (String word : s.split(" ")) {
+          lengths.add((double) word.length());
+        }
+        return lengths.iterator();
+      }
+    });
+    assertEquals(5.0, doubles.first(), 0.01);
+    assertEquals(11, pairsRDD.count());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void mapsFromPairsToPairs() {
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+    // Regression test for SPARK-668:
+    JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
+      new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
+        @Override
+        public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
+          return Collections.singletonList(item.swap()).iterator();
+        }
+      });
+    swapped.collect();
+
+    // There was never a bug here, but it's worth testing:
+    pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+      @Override
+      public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
+        return item.swap();
+      }
+    }).collect();
+  }
+
+  @Test
+  public void mapPartitions() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    JavaRDD<Integer> partitionSums = rdd.mapPartitions(
+      new FlatMapFunction<Iterator<Integer>, Integer>() {
+        @Override
+        public Iterator<Integer> call(Iterator<Integer> iter) {
+          int sum = 0;
+          while (iter.hasNext()) {
+            sum += iter.next();
+          }
+          return Collections.singletonList(sum).iterator();
+        }
+    });
+    assertEquals("[3, 7]", partitionSums.collect().toString());
+  }
+
+
+  @Test
+  public void mapPartitionsWithIndex() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex(
+      new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
+        @Override
+        public Iterator<Integer> call(Integer index, Iterator<Integer> iter) {
+          int sum = 0;
+          while (iter.hasNext()) {
+            sum += iter.next();
+          }
+          return Collections.singletonList(sum).iterator();
+        }
+    }, false);
+    assertEquals("[3, 7]", partitionSums.collect().toString());
+  }
+
+  @Test
+  public void getNumPartitions(){
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
+    JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
+    JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList(
+            new Tuple2<>("a", 1),
+            new Tuple2<>("aa", 2),
+            new Tuple2<>("aaa", 3)
+    ), 2);
+    assertEquals(3, rdd1.getNumPartitions());
+    assertEquals(2, rdd2.getNumPartitions());
+    assertEquals(2, rdd3.getNumPartitions());
+  }
+
+  @Test
+  public void repartition() {
+    // Shrinking number of partitions
+    JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
+    JavaRDD<Integer> repartitioned1 = in1.repartition(4);
+    List<List<Integer>> result1 = repartitioned1.glom().collect();
+    assertEquals(4, result1.size());
+    for (List<Integer> l : result1) {
+      assertFalse(l.isEmpty());
+    }
+
+    // Growing number of partitions
+    JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
+    JavaRDD<Integer> repartitioned2 = in2.repartition(2);
+    List<List<Integer>> result2 = repartitioned2.glom().collect();
+    assertEquals(2, result2.size());
+    for (List<Integer> l: result2) {
+      assertFalse(l.isEmpty());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void persist() {
+    JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
+    doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
+    assertEquals(20, doubleRDD.sum(), 0.1);
+
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+    pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
+    assertEquals("a", pairRDD.first()._2());
+
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    rdd = rdd.persist(StorageLevel.DISK_ONLY());
+    assertEquals(1, rdd.first().intValue());
+  }
+
+  @Test
+  public void iterator() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
+    TaskContext context = TaskContext$.MODULE$.empty();
+    assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
+  }
+
+  @Test
+  public void glom() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    assertEquals("[1, 2]", rdd.glom().first().toString());
+  }
+
+  // File input / output tests are largely adapted from FileSuite:
+
+  @Test
+  public void textFiles() throws IOException {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    rdd.saveAsTextFile(outputDir);
+    // Read the plain text file and check it's OK
+    File outputFile = new File(outputDir, "part-00000");
+    String content = Files.toString(outputFile, StandardCharsets.UTF_8);
+    assertEquals("1\n2\n3\n4\n", content);
+    // Also try reading it in as a text file RDD
+    List<String> expected = Arrays.asList("1", "2", "3", "4");
+    JavaRDD<String> readRDD = sc.textFile(outputDir);
+    assertEquals(expected, readRDD.collect());
+  }
+
+  @Test
+  public void wholeTextFiles() throws Exception {
+    byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
+    byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8);
+
+    String tempDirName = tempDir.getAbsolutePath();
+    String path1 = new Path(tempDirName, "part-00000").toUri().getPath();
+    String path2 = new Path(tempDirName, "part-00001").toUri().getPath();
+
+    Files.write(content1, new File(path1));
+    Files.write(content2, new File(path2));
+
+    Map<String, String> container = new HashMap<>();
+    container.put(path1, new Text(content1).toString());
+    container.put(path2, new Text(content2).toString());
+
+    JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
+    List<Tuple2<String, String>> result = readRDD.collect();
+
+    for (Tuple2<String, String> res : result) {
+      // Note that the paths from `wholeTextFiles` are in URI format on Windows,
+      // for example, file:/C:/a/b/c.
+      assertEquals(res._2(), container.get(new Path(res._1()).toUri().getPath()));
+    }
+  }
+
+  @Test
+  public void textFilesCompressed() throws IOException {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    rdd.saveAsTextFile(outputDir, DefaultCodec.class);
+
+    // Try reading it in as a text file RDD
+    List<String> expected = Arrays.asList("1", "2", "3", "4");
+    JavaRDD<String> readRDD = sc.textFile(outputDir);
+    assertEquals(expected, readRDD.collect());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void sequenceFile() {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    // Try reading the output back as an object file
+    JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
+      Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
+      @Override
+      public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
+        return new Tuple2<>(pair._1().get(), pair._2().toString());
+      }
+    });
+    assertEquals(pairs, readRDD.collect());
+  }
+
+  @Test
+  public void binaryFiles() throws Exception {
+    // Reusing the wholeText files example
+    byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
+
+    String tempDirName = tempDir.getAbsolutePath();
+    File file1 = new File(tempDirName + "/part-00000");
+
+    FileOutputStream fos1 = new FileOutputStream(file1);
+
+    FileChannel channel1 = fos1.getChannel();
+    ByteBuffer bbuf = ByteBuffer.wrap(content1);
+    channel1.write(bbuf);
+    channel1.close();
+    JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3);
+    List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
+    for (Tuple2<String, PortableDataStream> res : result) {
+      assertArrayEquals(content1, res._2().toArray());
+    }
+  }
+
+  @Test
+  public void binaryFilesCaching() throws Exception {
+    // Reusing the wholeText files example
+    byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
+
+    String tempDirName = tempDir.getAbsolutePath();
+    File file1 = new File(tempDirName + "/part-00000");
+
+    FileOutputStream fos1 = new FileOutputStream(file1);
+
+    FileChannel channel1 = fos1.getChannel();
+    ByteBuffer bbuf = ByteBuffer.wrap(content1);
+    channel1.write(bbuf);
+    channel1.close();
+
+    JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
+    readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
+      @Override
+      public void call(Tuple2<String, PortableDataStream> pair) {
+        pair._2().toArray(); // force the file to read
+      }
+    });
+
+    List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
+    for (Tuple2<String, PortableDataStream> res : result) {
+      assertArrayEquals(content1, res._2().toArray());
+    }
+  }
+
+  @Test
+  public void binaryRecords() throws Exception {
+    // Reusing the wholeText files example
+    byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8);
+    int numOfCopies = 10;
+    String tempDirName = tempDir.getAbsolutePath();
+    File file1 = new File(tempDirName + "/part-00000");
+
+    FileOutputStream fos1 = new FileOutputStream(file1);
+
+    FileChannel channel1 = fos1.getChannel();
+
+    for (int i = 0; i < numOfCopies; i++) {
+      ByteBuffer bbuf = ByteBuffer.wrap(content1);
+      channel1.write(bbuf);
+    }
+    channel1.close();
+
+    JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length);
+    assertEquals(numOfCopies,readRDD.count());
+    List<byte[]> result = readRDD.collect();
+    for (byte[] res : result) {
+      assertArrayEquals(content1, res);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void writeWithNewAPIHadoopFile() {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsNewAPIHadoopFile(
+        outputDir, IntWritable.class, Text.class,
+        org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+
+    JavaPairRDD<IntWritable, Text> output =
+        sc.sequenceFile(outputDir, IntWritable.class, Text.class);
+    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void readWithNewAPIHadoopFile() throws IOException {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
+        org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
+        IntWritable.class, Text.class, Job.getInstance().getConfiguration());
+    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @Test
+  public void objectFilesOfInts() {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    rdd.saveAsObjectFile(outputDir);
+    // Try reading the output back as an object file
+    List<Integer> expected = Arrays.asList(1, 2, 3, 4);
+    JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
+    assertEquals(expected, readRDD.collect());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void objectFilesOfComplexTypes() {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+    rdd.saveAsObjectFile(outputDir);
+    // Try reading the output back as an object file
+    JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
+    assertEquals(pairs, readRDD.collect());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void hadoopFile() {
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+        SequenceFileInputFormat.class, IntWritable.class, Text.class);
+    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void hadoopFileCompressed() {
+    String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
+        DefaultCodec.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+        SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
+  @Test
+  public void zip() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
+      @Override
+      public double call(Integer x) {
+        return x.doubleValue();
+      }
+    });
+    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+    zipped.count();
+  }
+
+  @Test
+  public void zipPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+    FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+      new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
+        @Override
+        public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
+          return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
+        }
+      };
+
+    JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+    assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void accumulators() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+    final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+    rdd.foreach(new VoidFunction<Integer>() {
+      @Override
+      public void call(Integer x) {
+        intAccum.add(x);
+      }
+    });
+    assertEquals((Integer) 25, intAccum.value());
+
+    final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+    rdd.foreach(new VoidFunction<Integer>() {
+      @Override
+      public void call(Integer x) {
+        doubleAccum.add((double) x);
+      }
+    });
+    assertEquals((Double) 25.0, doubleAccum.value());
+
+    // Try a custom accumulator type
+    AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+      @Override
+      public Float addInPlace(Float r, Float t) {
+        return r + t;
+      }
+
+      @Override
+      public Float addAccumulator(Float r, Float t) {
+        return r + t;
+      }
+
+      @Override
+      public Float zero(Float initialValue) {
+        return 0.0f;
+      }
+    };
+
+    final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
+    rdd.foreach(new VoidFunction<Integer>() {
+      @Override
+      public void call(Integer x) {
+        floatAccum.add((float) x);
+      }
+    });
+    assertEquals((Float) 25.0f, floatAccum.value());
+
+    // Test the setValue method
+    floatAccum.setValue(5.0f);
+    assertEquals((Float) 5.0f, floatAccum.value());
+  }
+
+  @Test
+  public void keyBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+    List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
+      @Override
+      public String call(Integer t) {
+        return t.toString();
+      }
+    }).collect();
+    assertEquals(new Tuple2<>("1", 1), s.get(0));
+    assertEquals(new Tuple2<>("2", 2), s.get(1));
+  }
+
+  @Test
+  public void checkpointAndComputation() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
+    assertFalse(rdd.isCheckpointed());
+    rdd.checkpoint();
+    rdd.count(); // Forces the DAG to cause a checkpoint
+    assertTrue(rdd.isCheckpointed());
+    assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
+  }
+
+  @Test
+  public void checkpointAndRestore() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
+    assertFalse(rdd.isCheckpointed());
+    rdd.checkpoint();
+    rdd.count(); // Forces the DAG to cause a checkpoint
+    assertTrue(rdd.isCheckpointed());
+
+    assertTrue(rdd.getCheckpointFile().isPresent());
+    JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
+    assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
+  }
+
+  @Test
+  public void combineByKey() {
+    JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
+    Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() {
+      @Override
+      public Integer call(Integer v1) {
+        return v1 % 3;
+      }
+    };
+    Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() {
+      @Override
+      public Integer call(Integer v1) {
+        return v1;
+      }
+    };
+
+    Function2<Integer, Integer, Integer> mergeValueFunction =
+        new Function2<Integer, Integer, Integer>() {
+      @Override
+      public Integer call(Integer v1, Integer v2) {
+        return v1 + v2;
+      }
+    };
+
+    JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
+        .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
+    Map<Integer, Integer> results = combinedRDD.collectAsMap();
+    ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
+    assertEquals(expected, results);
+
+    Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
+        combinedRDD.rdd(),
+        JavaConverters.collectionAsScalaIterableConverter(
+            Collections.<RDD<?>>emptyList()).asScala().toSeq());
+    combinedRDD = originalRDD.keyBy(keyFunction)
+        .combineByKey(
+             createCombinerFunction,
+             mergeValueFunction,
+             mergeValueFunction,
+             defaultPartitioner,
+             false,
+             new KryoSerializer(new SparkConf()));
+    results = combinedRDD.collectAsMap();
+    assertEquals(expected, results);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void mapOnPairRDD() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer i) {
+            return new Tuple2<>(i, i % 2);
+          }
+        });
+    JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
+        new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
+            return new Tuple2<>(in._2(), in._1());
+          }
+        });
+    assertEquals(Arrays.asList(
+        new Tuple2<>(1, 1),
+        new Tuple2<>(0, 2),
+        new Tuple2<>(1, 3),
+        new Tuple2<>(0, 4)), rdd3.collect());
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void collectPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer i) {
+            return new Tuple2<>(i, i % 2);
+          }
+        });
+
+    List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
+    assertEquals(Arrays.asList(1, 2), parts[0]);
+
+    parts = rdd1.collectPartitions(new int[] {1, 2});
+    assertEquals(Arrays.asList(3, 4), parts[0]);
+    assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+    assertEquals(Arrays.asList(new Tuple2<>(1, 1),
+                                      new Tuple2<>(2, 0)),
+                        rdd2.collectPartitions(new int[] {0})[0]);
+
+    List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
+    assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
+    assertEquals(Arrays.asList(new Tuple2<>(5, 1),
+                                      new Tuple2<>(6, 0),
+                                      new Tuple2<>(7, 1)),
+                        parts2[1]);
+  }
+
+  @Test
+  public void countApproxDistinct() {
+    List<Integer> arrayData = new ArrayList<>();
+    int size = 100;
+    for (int i = 0; i < 100000; i++) {
+      arrayData.add(i % size);
+    }
+    JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
+    assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
+  }
+
+  @Test
+  public void countApproxDistinctByKey() {
+    List<Tuple2<Integer, Integer>> arrayData = new ArrayList<>();
+    for (int i = 10; i < 100; i++) {
+      for (int j = 0; j < i; j++) {
+        arrayData.add(new Tuple2<>(i, j));
+      }
+    }
+    double relativeSD = 0.001;
+    JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
+    List<Tuple2<Integer, Long>> res =  pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
+    for (Tuple2<Integer, Long> resItem : res) {
+      double count = resItem._1();
+      long resCount = resItem._2();
+      double error = Math.abs((resCount - count) / count);
+      assertTrue(error < 0.1);
+    }
+
+  }
+
+  @Test
+  public void collectAsMapWithIntArrayValues() {
+    // Regression test for SPARK-1040
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(
+        new PairFunction<Integer, Integer, int[]>() {
+          @Override
+          public Tuple2<Integer, int[]> call(Integer x) {
+            return new Tuple2<>(x, new int[]{x});
+          }
+        });
+    pairRDD.collect();  // Works fine
+    pairRDD.collectAsMap();  // Used to crash with ClassCastException
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void collectAsMapAndSerialize() throws Exception {
+    JavaPairRDD<String,Integer> rdd =
+        sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
+    Map<String,Integer> map = rdd.collectAsMap();
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    new ObjectOutputStream(bytes).writeObject(map);
+    Map<String,Integer> deserializedMap = (Map<String,Integer>)
+        new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
+    assertEquals(1, deserializedMap.get("foo").intValue());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void sampleByKey() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+      new PairFunction<Integer, Integer, Integer>() {
+        @Override
+        public Tuple2<Integer, Integer> call(Integer i) {
+          return new Tuple2<>(i % 2, 1);
+        }
+      });
+    Map<Integer, Double> fractions = new HashMap<>();
+    fractions.put(0, 0.5);
+    fractions.put(1, 1.0);
+    JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
+    Map<Integer, Long> wrCounts = wr.countByKey();
+    assertEquals(2, wrCounts.size());
+    assertTrue(wrCounts.get(0) > 0);
+    assertTrue(wrCounts.get(1) > 0);
+    JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
+    Map<Integer, Long> worCounts = wor.countByKey();
+    assertEquals(2, worCounts.size());
+    assertTrue(worCounts.get(0) > 0);
+    assertTrue(worCounts.get(1) > 0);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void sampleByKeyExact() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+      new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer i) {
+              return new Tuple2<>(i % 2, 1);
+          }
+      });
+    Map<Integer, Double> fractions = new HashMap<>();
+    fractions.put(0, 0.5);
+    fractions.put(1, 1.0);
+    JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
+    Map<Integer, Long> wrExactCounts = wrExact.countByKey();
+    assertEquals(2, wrExactCounts.size());
+    assertTrue(wrExactCounts.get(0) == 2);
+    assertTrue(wrExactCounts.get(1) == 4);
+    JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
+    Map<Integer, Long> worExactCounts = worExact.countByKey();
+    assertEquals(2, worExactCounts.size());
+    assertTrue(worExactCounts.get(0) == 2);
+    assertTrue(worExactCounts.get(1) == 4);
+  }
+
+  private static class SomeCustomClass implements Serializable {
+    SomeCustomClass() {
+      // Intentionally left blank
+    }
+  }
+
+  @Test
+  public void collectUnderlyingScalaRDD() {
+    List<SomeCustomClass> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add(new SomeCustomClass());
+    }
+    JavaRDD<SomeCustomClass> rdd = sc.parallelize(data);
+    SomeCustomClass[] collected =
+      (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
+    assertEquals(data.size(), collected.length);
+  }
+
+  private static final class BuggyMapFunction<T> implements Function<T, T> {
+
+    @Override
+    public T call(T x) {
+      throw new IllegalStateException("Custom exception!");
+    }
+  }
+
+  @Test
+  public void collectAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<List<Integer>> future = rdd.collectAsync();
+    List<Integer> result = future.get();
+    assertEquals(data, result);
+    assertFalse(future.isCancelled());
+    assertTrue(future.isDone());
+    assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
+  public void takeAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<List<Integer>> future = rdd.takeAsync(1);
+    List<Integer> result = future.get();
+    assertEquals(1, result.size());
+    assertEquals((Integer) 1, result.get(0));
+    assertFalse(future.isCancelled());
+    assertTrue(future.isDone());
+    assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
+  public void foreachAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Void> future = rdd.foreachAsync(
+        new VoidFunction<Integer>() {
+          @Override
+          public void call(Integer integer) {
+            // intentionally left blank.
+          }
+        }
+    );
+    future.get();
+    assertFalse(future.isCancelled());
+    assertTrue(future.isDone());
+    assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
+  public void countAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Long> future = rdd.countAsync();
+    long count = future.get();
+    assertEquals(data.size(), count);
+    assertFalse(future.isCancelled());
+    assertTrue(future.isDone());
+    assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
+  public void testAsyncActionCancellation() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() {
+      @Override
+      public void call(Integer integer) throws InterruptedException {
+        Thread.sleep(10000);  // To ensure that the job won't finish before it's cancelled.
+      }
+    });
+    future.cancel(true);
+    assertTrue(future.isCancelled());
+    assertTrue(future.isDone());
+    try {
+      future.get(2000, TimeUnit.MILLISECONDS);
+      fail("Expected future.get() for cancelled job to throw CancellationException");
+    } catch (CancellationException ignored) {
+      // pass
+    }
+  }
+
+  @Test
+  public void testAsyncActionErrorWrapping() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync();
+    try {
+      future.get(2, TimeUnit.SECONDS);
+      fail("Expected future.get() for failed job to throw ExcecutionException");
+    } catch (ExecutionException ee) {
+      assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
+    }
+    assertTrue(future.isDone());
+  }
+
+  static class Class1 {}
+  static class Class2 {}
+
+  @Test
+  public void testRegisterKryoClasses() {
+    SparkConf conf = new SparkConf();
+    conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class });
+    assertEquals(
+        Class1.class.getName() + "," + Class2.class.getName(),
+        conf.get("spark.kryo.classesToRegister"));
+  }
+
+  @Test
+  public void testGetPersistentRDDs() {
+    java.util.Map<Integer, JavaRDD<?>> cachedRddsMap = sc.getPersistentRDDs();
+    assertTrue(cachedRddsMap.isEmpty());
+    JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b")).setName("RDD1").cache();
+    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("c", "d")).setName("RDD2").cache();
+    cachedRddsMap = sc.getPersistentRDDs();
+    assertEquals(2, cachedRddsMap.size());
+    assertEquals("RDD1", cachedRddsMap.get(0).name());
+    assertEquals("RDD2", cachedRddsMap.get(1).name());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 6027310..43f77e6 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -919,7 +919,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
         assert(pidExists(pid))
         val terminated = Utils.terminateProcess(process, 5000)
         assert(terminated.isDefined)
-        Utils.waitForProcess(process, 5000)
+        process.waitFor(5, TimeUnit.SECONDS)
         val durationMs = System.currentTimeMillis() - startTimeMs
         assert(durationMs < 5000)
         assert(!pidExists(pid))
@@ -932,7 +932,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
       var majorVersion = versionParts(0).toInt
       if (majorVersion == 1) majorVersion = versionParts(1).toInt
       if (majorVersion >= 8) {
-        // Java8 added a way to forcibly terminate a process. We'll make sure that works by
+        // We'll make sure that forcibly terminating a process works by
         // creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On
         // older versions of java, this will *not* terminate.
         val file = File.createTempFile("temp-file-name", ".tmp")
@@ -953,7 +953,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
           val start = System.currentTimeMillis()
           val terminated = Utils.terminateProcess(process, 5000)
           assert(terminated.isDefined)
-          Utils.waitForProcess(process, 5000)
+          process.waitFor(5, TimeUnit.SECONDS)
           val duration = System.currentTimeMillis() - start
           assert(duration < 6000) // add a little extra time to allow a force kill to finish
           assert(!pidExists(pid))

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/dev/appveyor-install-dependencies.ps1
----------------------------------------------------------------------
diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1
index 1350095..1c34f1b 100644
--- a/dev/appveyor-install-dependencies.ps1
+++ b/dev/appveyor-install-dependencies.ps1
@@ -90,7 +90,7 @@ Invoke-Expression "7z.exe x maven.zip"
 # add maven to environment variables
 $env:Path += ";$tools\apache-maven-$mavenVer\bin"
 $env:M2_HOME = "$tools\apache-maven-$mavenVer"
-$env:MAVEN_OPTS = "-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
+$env:MAVEN_OPTS = "-Xmx2g -XX:ReservedCodeCacheSize=512m"
 
 Pop-Location
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/dev/create-release/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index d616f80..e1db997 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -267,7 +267,6 @@ if [[ "$1" == "docs" ]]; then
   echo "Building Spark docs"
   dest_dir="$REMOTE_PARENT_DIR/${DEST_DIR_NAME}-docs"
   cd docs
-  # Compile docs with Java 7 to use nicer format
   # TODO: Make configurable to add this: PRODUCTION=1
   PRODUCTION=1 RELEASE_VERSION="$SPARK_VERSION" jekyll build
   echo "Copying release documentation to $dest_dir"

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/dev/make-distribution.sh
----------------------------------------------------------------------
diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh
index dc8dfb9..22cdfd4 100755
--- a/dev/make-distribution.sh
+++ b/dev/make-distribution.sh
@@ -146,7 +146,7 @@ fi
 # Build uber fat JAR
 cd "$SPARK_HOME"
 
-export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m}"
+export MAVEN_OPTS="${MAVEN_OPTS:-Xmx2g -XX:ReservedCodeCacheSize=512m}"
 
 # Store the command as an array because $MVN variable might have spaces in it.
 # Normal quoting tricks don't work.

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/dev/mima
----------------------------------------------------------------------
diff --git a/dev/mima b/dev/mima
index 11c4af2..eca78ad 100755
--- a/dev/mima
+++ b/dev/mima
@@ -31,7 +31,6 @@ OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export
 rm -f .generated-mima*
 
 java \
-  -XX:MaxPermSize=1g \
   -Xmx2g \
   -cp "$TOOLS_CLASSPATH:$OLD_DEPS_CLASSPATH" \
   org.apache.spark.tools.GenerateMIMAIgnore

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 0e7f5ff..04035b3 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -492,9 +492,6 @@ def main():
 
     java_version = determine_java_version(java_exe)
 
-    if java_version.minor < 8:
-        print("[warn] Java 8 tests will not run because JDK version is < 1.8.")
-
     # install SparkR
     if which("R"):
         run_cmd([os.path.join(SPARK_HOME, "R", "install-dev.sh")])

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/dev/test-dependencies.sh
----------------------------------------------------------------------
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index eb43f22..2906a81 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -46,7 +46,7 @@ OLD_VERSION=$($MVN -q \
     -Dexec.executable="echo" \
     -Dexec.args='${project.version}' \
     --non-recursive \
-    org.codehaus.mojo:exec-maven-plugin:1.3.1:exec)
+    org.codehaus.mojo:exec-maven-plugin:1.5.0:exec)
 if [ $? != 0 ]; then
     echo -e "Error while getting version string from Maven:\n$OLD_VERSION"
     exit 1

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/building-spark.md
----------------------------------------------------------------------
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 690c656..56b8926 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -12,8 +12,8 @@ redirect_from: "building-with-maven.html"
 ## Apache Maven
 
 The Maven-based build is the build of reference for Apache Spark.
-Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+.
-Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0.
+Building Spark using Maven requires Maven 3.3.9 or newer and Java 8+.
+Note that support for Java 7 was removed as of Spark 2.2.0.
 
 ### Setting up Maven's Memory Usage
 
@@ -21,28 +21,18 @@ You'll need to configure Maven to use more memory than usual by setting `MAVEN_O
 
     export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
 
-When compiling with Java 7, you will need to add the additional option "-XX:MaxPermSize=512M" to MAVEN_OPTS.
-
+(The `ReservedCodeCacheSize` setting is optional but recommended.)
 If you don't add these parameters to `MAVEN_OPTS`, you may see errors and warnings like the following:
 
     [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
-    [ERROR] PermGen space -> [Help 1]
-
-    [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
     [ERROR] Java heap space -> [Help 1]
 
-    [INFO] Compiling 233 Scala sources and 41 Java sources to /Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes...
-    OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
-    OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
-
 You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before.
 
 **Note:**
 
 * If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options to the `MAVEN_OPTS` environment variable.
-* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`.
-* You may see warnings like "ignoring option MaxPermSize=1g; support was removed in 8.0" when building or running tests with Java 8 and `build/mvn`. These warnings are harmless.
-
+* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`.    
 
 ### build/mvn
 
@@ -224,20 +214,6 @@ To run test suites of a specific sub project as follows:
 
     ./build/sbt core/test
 
-## Running Java 8 Test Suites
-
-Running only Java 8 tests and nothing else.
-
-    ./build/mvn install -DskipTests
-    ./build/mvn -pl :java8-tests_2.11 test
-
-or
-
-    ./build/sbt java8-tests/test
-
-Java 8 tests are automatically enabled when a Java 8 JDK is detected.
-If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.
-
 ## PySpark pip installable
 
 If you are building Spark for use in a Python environment and you wish to pip install it, you will first need to build the Spark JARs as described above. Then you can construct an sdist package suitable for setup.py and pip installable package.

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 023e06a..19a9d3b 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -26,11 +26,13 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy
 locally on one machine --- all you need is to have `java` installed on your system `PATH`,
 or the `JAVA_HOME` environment variable pointing to a Java installation.
 
-Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}}
+Spark runs on Java 8+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}}
 uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version
 ({{site.SCALA_BINARY_VERSION}}.x).
 
-Note that support for Java 7 and Python 2.6 are deprecated as of Spark 2.0.0, and support for 
+Note that support for Java 7 was removed as of Spark 2.2.0.
+
+Note that support for Python 2.6 is deprecated as of Spark 2.0.0, and support for 
 Scala 2.10 and versions of Hadoop before 2.6 are deprecated as of Spark 2.1.0, and may be 
 removed in Spark 2.2.0.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/mllib-linear-methods.md
----------------------------------------------------------------------
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 3085539..034e89e 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -222,7 +222,7 @@ svmAlg.optimizer()
   .setNumIterations(200)
   .setRegParam(0.1)
   .setUpdater(new L1Updater());
-final SVMModel modelL1 = svmAlg.run(training.rdd());
+SVMModel modelL1 = svmAlg.run(training.rdd());
 {% endhighlight %}
 
 In order to run the above application, follow the instructions

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/mllib-statistics.md
----------------------------------------------------------------------
diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 430c069..c29400a 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -317,12 +317,7 @@ JavaSparkContext jsc = ...
 // standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
 JavaDoubleRDD u = normalJavaRDD(jsc, 1000000L, 10);
 // Apply a transform to get a random double RDD following `N(1, 4)`.
-JavaDoubleRDD v = u.map(
-  new Function<Double, Double>() {
-    public Double call(Double x) {
-      return 1.0 + 2.0 * x;
-    }
-  });
+JavaDoubleRDD v = u.mapToDouble(x -> 1.0 + 2.0 * x);
 {% endhighlight %}
 </div>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index db8b048..6740dbe 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -54,12 +54,12 @@ import org.apache.spark.SparkConf
 
 <div data-lang="java"  markdown="1">
 
-Spark {{site.SPARK_VERSION}} works with Java 7 and higher. If you are using Java 8, Spark supports
+Spark {{site.SPARK_VERSION}} supports
 [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
 for concisely writing functions, otherwise you can use the classes in the
 [org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
 
-Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0.
+Note that support for Java 7 was removed in Spark 2.2.0.
 
 To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:
 
@@ -295,11 +295,6 @@ JavaRDD<Integer> distData = sc.parallelize(data);
 Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) -> a + b)` to add up the elements of the list.
 We describe operations on distributed datasets later on.
 
-**Note:** *In this guide, we'll often use the concise Java 8 lambda syntax to specify Java functions, but
-in older versions of Java you can implement the interfaces in the
-[org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
-We describe [passing functions to Spark](#passing-functions-to-spark) in more detail below.*
-
 </div>
 
 <div data-lang="python"  markdown="1">
@@ -658,7 +653,7 @@ There are two ways to create such functions:
 
 * Implement the Function interfaces in your own class, either as an anonymous inner class or a named one,
   and pass an instance of it to Spark.
-* In Java 8, use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
+* Use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
   to concisely define an implementation.
 
 While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/quick-start.md
----------------------------------------------------------------------
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 0836c60..04ac278 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -320,13 +320,8 @@ public class SimpleApp {
     JavaSparkContext sc = new JavaSparkContext(conf);
     JavaRDD<String> logData = sc.textFile(logFile).cache();
 
-    long numAs = logData.filter(new Function<String, Boolean>() {
-      public Boolean call(String s) { return s.contains("a"); }
-    }).count();
-
-    long numBs = logData.filter(new Function<String, Boolean>() {
-      public Boolean call(String s) { return s.contains("b"); }
-    }).count();
+    long numAs = logData.filter(s -> s.contains("a")).count();
+    long numBs = logData.filter(s -> s.contains("b")).count();
 
     System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
     

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-custom-receivers.md
----------------------------------------------------------------------
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 117996d..d4ddcb1 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -113,15 +113,13 @@ public class JavaCustomReceiver extends Receiver<String> {
     port = port_;
   }
 
+  @Override
   public void onStart() {
     // Start the thread that receives data over a connection
-    new Thread()  {
-      @Override public void run() {
-        receive();
-      }
-    }.start();
+    new Thread(this::receive).start();
   }
 
+  @Override
   public void onStop() {
     // There is nothing much to do as the thread calling receive()
     // is designed to stop by itself if isStopped() returns false
@@ -189,7 +187,7 @@ The full source code is in the example [CustomReceiver.scala]({{site.SPARK_GITHU
 {% highlight java %}
 // Assuming ssc is the JavaStreamingContext
 JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
-JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
+JavaDStream<String> words = lines.flatMap(s -> ...);
 ...
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-kafka-0-10-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index 6ef54ac..e383701 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -68,20 +68,14 @@ kafkaParams.put("enable.auto.commit", false);
 
 Collection<String> topics = Arrays.asList("topicA", "topicB");
 
-final JavaInputDStream<ConsumerRecord<String, String>> stream =
+JavaInputDStream<ConsumerRecord<String, String>> stream =
   KafkaUtils.createDirectStream(
     streamingContext,
     LocationStrategies.PreferConsistent(),
     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
   );
 
-stream.mapToPair(
-  new PairFunction<ConsumerRecord<String, String>, String, String>() {
-    @Override
-    public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
-      return new Tuple2<>(record.key(), record.value());
-    }
-  })
+stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
 {% endhighlight %}
 </div>
 </div>
@@ -162,19 +156,13 @@ stream.foreachRDD { rdd =>
 </div>
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
-  @Override
-  public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
-    final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
-    rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
-      @Override
-      public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
-        OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
-        System.out.println(
-          o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
-      }
-    });
-  }
+stream.foreachRDD(rdd -> {
+  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+  rdd.foreachPartition(consumerRecords -> {
+    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
+    System.out.println(
+      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
+  });
 });
 {% endhighlight %}
 </div>
@@ -205,14 +193,11 @@ As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if calle
 </div>
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
-  @Override
-  public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
-    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+stream.foreachRDD(rdd -> {
+  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
 
-    // some time later, after outputs have completed
-    ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
-  }
+  // some time later, after outputs have completed
+  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
 });
 {% endhighlight %}
 </div>
@@ -268,21 +253,18 @@ JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirec
   ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
 );
 
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
-  @Override
-  public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
-    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
-    
-    Object results = yourCalculation(rdd);
+stream.foreachRDD(rdd -> {
+  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+  
+  Object results = yourCalculation(rdd);
 
-    // begin your transaction
+  // begin your transaction
 
-    // update results
-    // update offsets where the end of existing offsets matches the beginning of this batch of offsets
-    // assert that offsets were updated correctly
+  // update results
+  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+  // assert that offsets were updated correctly
 
-    // end your transaction
-  }
+  // end your transaction
 });
 {% endhighlight %}
 </div>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org