You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/19 17:42:55 UTC

[2/3] spark git commit: [SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 80aab10..5121491 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -31,7 +31,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.*;
 
 import org.apache.spark.Accumulator;
@@ -208,7 +207,7 @@ public class JavaAPISuite implements Serializable {
     assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
 
     // Custom comparator
-    sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false);
+    sortedRDD = rdd.sortByKey(Collections.reverseOrder(), false);
     assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
     sortedPairs = sortedRDD.collect();
     assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
@@ -266,13 +265,7 @@ public class JavaAPISuite implements Serializable {
     JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
 
     // compare on first value
-    JavaRDD<Tuple2<Integer, Integer>> sortedRDD =
-        rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
-      @Override
-      public Integer call(Tuple2<Integer, Integer> t) {
-        return t._1();
-      }
-    }, true, 2);
+    JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2);
 
     assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
     List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
@@ -280,12 +273,7 @@ public class JavaAPISuite implements Serializable {
     assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
 
     // compare on second value
-    sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
-      @Override
-      public Integer call(Tuple2<Integer, Integer> t) {
-        return t._2();
-      }
-    }, true, 2);
+    sortedRDD = rdd.sortBy(Tuple2::_2, true, 2);
     assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
     sortedPairs = sortedRDD.collect();
     assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
@@ -294,28 +282,20 @@ public class JavaAPISuite implements Serializable {
 
   @Test
   public void foreach() {
-    final LongAccumulator accum = sc.sc().longAccumulator();
+    LongAccumulator accum = sc.sc().longAccumulator();
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
-    rdd.foreach(new VoidFunction<String>() {
-      @Override
-      public void call(String s) {
-        accum.add(1);
-      }
-    });
+    rdd.foreach(s -> accum.add(1));
     assertEquals(2, accum.value().intValue());
   }
 
   @Test
   public void foreachPartition() {
-    final LongAccumulator accum = sc.sc().longAccumulator();
+    LongAccumulator accum = sc.sc().longAccumulator();
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
-    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
-      @Override
-      public void call(Iterator<String> iter) {
-        while (iter.hasNext()) {
-          iter.next();
-          accum.add(1);
-        }
+    rdd.foreachPartition(iter -> {
+      while (iter.hasNext()) {
+        iter.next();
+        accum.add(1);
       }
     });
     assertEquals(2, accum.value().intValue());
@@ -361,12 +341,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void groupBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
-      @Override
-      public Boolean call(Integer x) {
-        return x % 2 == 0;
-      }
-    };
+    Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
     JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
     assertEquals(2, oddsAndEvens.count());
     assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
@@ -383,12 +358,7 @@ public class JavaAPISuite implements Serializable {
     // Regression test for SPARK-4459
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
     Function<Tuple2<Integer, Integer>, Boolean> areOdd =
-      new Function<Tuple2<Integer, Integer>, Boolean>() {
-        @Override
-        public Boolean call(Tuple2<Integer, Integer> x) {
-          return (x._1() % 2 == 0) && (x._2() % 2 == 0);
-        }
-      };
+        x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
     JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
     JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
     assertEquals(2, oddsAndEvens.count());
@@ -406,13 +376,7 @@ public class JavaAPISuite implements Serializable {
   public void keyByOnPairRDD() {
     // Regression test for SPARK-4459
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Function<Tuple2<Integer, Integer>, String> sumToString =
-      new Function<Tuple2<Integer, Integer>, String>() {
-        @Override
-        public String call(Tuple2<Integer, Integer> x) {
-          return String.valueOf(x._1() + x._2());
-        }
-      };
+    Function<Tuple2<Integer, Integer>, String> sumToString = x -> String.valueOf(x._1() + x._2());
     JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
     JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
     assertEquals(7, keyed.count());
@@ -516,25 +480,14 @@ public class JavaAPISuite implements Serializable {
       rdd1.leftOuterJoin(rdd2).collect();
     assertEquals(5, joined.size());
     Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
-      rdd1.leftOuterJoin(rdd2).filter(
-        new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
-          @Override
-          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) {
-            return !tup._2()._2().isPresent();
-          }
-      }).first();
+      rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
     assertEquals(3, firstUnmatched._1().intValue());
   }
 
   @Test
   public void foldReduce() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    };
+    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
 
     int sum = rdd.fold(0, add);
     assertEquals(33, sum);
@@ -546,12 +499,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void treeReduce() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
-    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    };
+    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
     for (int depth = 1; depth <= 10; depth++) {
       int sum = rdd.treeReduce(add, depth);
       assertEquals(-5, sum);
@@ -561,12 +509,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void treeAggregate() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
-    Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    };
+    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
     for (int depth = 1; depth <= 10; depth++) {
       int sum = rdd.treeAggregate(0, add, add, depth);
       assertEquals(-5, sum);
@@ -584,21 +527,15 @@ public class JavaAPISuite implements Serializable {
         new Tuple2<>(5, 1),
         new Tuple2<>(5, 3)), 2);
 
-    Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
-      new Function2<Set<Integer>, Integer, Set<Integer>>() {
-        @Override
-        public Set<Integer> call(Set<Integer> a, Integer b) {
-          a.add(b);
-          return a;
-        }
-      },
-      new Function2<Set<Integer>, Set<Integer>, Set<Integer>>() {
-        @Override
-        public Set<Integer> call(Set<Integer> a, Set<Integer> b) {
-          a.addAll(b);
-          return a;
-        }
-      }).collectAsMap();
+    Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
+         (a, b) -> {
+           a.add(b);
+           return a;
+         },
+         (a, b) -> {
+           a.addAll(b);
+           return a;
+         }).collectAsMap();
     assertEquals(3, sets.size());
     assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
     assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
@@ -616,13 +553,7 @@ public class JavaAPISuite implements Serializable {
       new Tuple2<>(3, 1)
     );
     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer a, Integer b) {
-          return a + b;
-        }
-    });
+    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
     assertEquals(1, sums.lookup(1).get(0).intValue());
     assertEquals(2, sums.lookup(2).get(0).intValue());
     assertEquals(3, sums.lookup(3).get(0).intValue());
@@ -639,13 +570,7 @@ public class JavaAPISuite implements Serializable {
       new Tuple2<>(3, 1)
     );
     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer a, Integer b) {
-         return a + b;
-        }
-    });
+    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
     assertEquals(1, counts.lookup(1).get(0).intValue());
     assertEquals(2, counts.lookup(2).get(0).intValue());
     assertEquals(3, counts.lookup(3).get(0).intValue());
@@ -655,12 +580,7 @@ public class JavaAPISuite implements Serializable {
     assertEquals(2, localCounts.get(2).intValue());
     assertEquals(3, localCounts.get(3).intValue());
 
-    localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    });
+    localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
     assertEquals(1, localCounts.get(1).intValue());
     assertEquals(2, localCounts.get(2).intValue());
     assertEquals(3, localCounts.get(3).intValue());
@@ -692,20 +612,8 @@ public class JavaAPISuite implements Serializable {
     assertTrue(sc.emptyRDD().isEmpty());
     assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
     assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
-    assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
-        new Function<Integer,Boolean>() {
-          @Override
-          public Boolean call(Integer i) {
-            return i < 0;
-          }
-        }).isEmpty());
-    assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
-        new Function<Integer, Boolean>() {
-          @Override
-          public Boolean call(Integer i) {
-            return i > 1;
-          }
-        }).isEmpty());
+    assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(i -> i < 0).isEmpty());
+    assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(i -> i > 1).isEmpty());
   }
 
   @Test
@@ -721,12 +629,7 @@ public class JavaAPISuite implements Serializable {
     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
     JavaDoubleRDD distinct = rdd.distinct();
     assertEquals(5, distinct.count());
-    JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
-      @Override
-      public Boolean call(Double x) {
-        return x > 2.0;
-      }
-    });
+    JavaDoubleRDD filter = rdd.filter(x -> x > 2.0);
     assertEquals(3, filter.count());
     JavaDoubleRDD union = rdd.union(rdd);
     assertEquals(12, union.count());
@@ -764,7 +667,7 @@ public class JavaAPISuite implements Serializable {
     // SPARK-5744
     assertArrayEquals(
         new long[] {0},
-        sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
+        sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
   }
 
   private static class DoubleComparator implements Comparator<Double>, Serializable {
@@ -833,12 +736,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void reduceOnJavaDoubleRDD() {
     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
-    double sum = rdd.reduce(new Function2<Double, Double, Double>() {
-      @Override
-      public Double call(Double v1, Double v2) {
-        return v1 + v2;
-      }
-    });
+    double sum = rdd.reduce((v1, v2) -> v1 + v2);
     assertEquals(10.0, sum, 0.001);
   }
 
@@ -859,27 +757,11 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void map() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
-      @Override
-      public double call(Integer x) {
-        return x.doubleValue();
-      }
-    }).cache();
+    JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue).cache();
     doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(
-        new PairFunction<Integer, Integer, Integer>() {
-          @Override
-          public Tuple2<Integer, Integer> call(Integer x) {
-            return new Tuple2<>(x, x);
-          }
-        }).cache();
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)).cache();
     pairs.collect();
-    JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
-      @Override
-      public String call(Integer x) {
-        return x.toString();
-      }
-    }).cache();
+    JavaRDD<String> strings = rdd.map(Object::toString).cache();
     strings.collect();
   }
 
@@ -887,39 +769,27 @@ public class JavaAPISuite implements Serializable {
   public void flatMap() {
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
       "The quick brown fox jumps over the lazy dog."));
-    JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(x.split(" ")).iterator();
-      }
-    });
+    JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
     assertEquals("Hello", words.first());
     assertEquals(11, words.count());
 
-    JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
-      new PairFlatMapFunction<String, String, String>() {
-        @Override
-        public Iterator<Tuple2<String, String>> call(String s) {
-          List<Tuple2<String, String>> pairs = new LinkedList<>();
-          for (String word : s.split(" ")) {
-            pairs.add(new Tuple2<>(word, word));
-          }
-          return pairs.iterator();
+    JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(s -> {
+        List<Tuple2<String, String>> pairs = new LinkedList<>();
+        for (String word : s.split(" ")) {
+          pairs.add(new Tuple2<>(word, word));
         }
+        return pairs.iterator();
       }
     );
     assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first());
     assertEquals(11, pairsRDD.count());
 
-    JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
-      @Override
-      public Iterator<Double> call(String s) {
-        List<Double> lengths = new LinkedList<>();
-        for (String word : s.split(" ")) {
-          lengths.add((double) word.length());
-        }
-        return lengths.iterator();
+    JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+      List<Double> lengths = new LinkedList<>();
+      for (String word : s.split(" ")) {
+        lengths.add((double) word.length());
       }
+      return lengths.iterator();
     });
     assertEquals(5.0, doubles.first(), 0.01);
     assertEquals(11, pairsRDD.count());
@@ -937,37 +807,23 @@ public class JavaAPISuite implements Serializable {
 
     // Regression test for SPARK-668:
     JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
-      new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
-        @Override
-        public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
-          return Collections.singletonList(item.swap()).iterator();
-        }
-      });
+        item -> Collections.singletonList(item.swap()).iterator());
     swapped.collect();
 
     // There was never a bug here, but it's worth testing:
-    pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
-      @Override
-      public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
-        return item.swap();
-      }
-    }).collect();
+    pairRDD.mapToPair(Tuple2::swap).collect();
   }
 
   @Test
   public void mapPartitions() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
-    JavaRDD<Integer> partitionSums = rdd.mapPartitions(
-      new FlatMapFunction<Iterator<Integer>, Integer>() {
-        @Override
-        public Iterator<Integer> call(Iterator<Integer> iter) {
-          int sum = 0;
-          while (iter.hasNext()) {
-            sum += iter.next();
-          }
-          return Collections.singletonList(sum).iterator();
+    JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+        int sum = 0;
+        while (iter.hasNext()) {
+          sum += iter.next();
         }
-    });
+        return Collections.singletonList(sum).iterator();
+      });
     assertEquals("[3, 7]", partitionSums.collect().toString());
   }
 
@@ -975,17 +831,13 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void mapPartitionsWithIndex() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
-    JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex(
-      new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
-        @Override
-        public Iterator<Integer> call(Integer index, Iterator<Integer> iter) {
-          int sum = 0;
-          while (iter.hasNext()) {
-            sum += iter.next();
-          }
-          return Collections.singletonList(sum).iterator();
+    JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex((index, iter) -> {
+        int sum = 0;
+        while (iter.hasNext()) {
+          sum += iter.next();
         }
-    }, false);
+        return Collections.singletonList(sum).iterator();
+      }, false);
     assertEquals("[3, 7]", partitionSums.collect().toString());
   }
 
@@ -1124,21 +976,12 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+        .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 
     // Try reading the output back as an object file
     JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
-      Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
-      @Override
-      public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
-        return new Tuple2<>(pair._1().get(), pair._2().toString());
-      }
-    });
+      Text.class).mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
     assertEquals(pairs, readRDD.collect());
   }
 
@@ -1179,12 +1022,7 @@ public class JavaAPISuite implements Serializable {
     channel1.close();
 
     JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
-    readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
-      @Override
-      public void call(Tuple2<String, PortableDataStream> pair) {
-        pair._2().toArray(); // force the file to read
-      }
-    });
+    readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
 
     List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
     for (Tuple2<String, PortableDataStream> res : result) {
@@ -1229,23 +1067,13 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsNewAPIHadoopFile(
-        outputDir, IntWritable.class, Text.class,
+    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+        .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
         org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
 
     JavaPairRDD<IntWritable, Text> output =
         sc.sequenceFile(outputDir, IntWritable.class, Text.class);
-    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
+    assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
   }
 
   @SuppressWarnings("unchecked")
@@ -1259,22 +1087,13 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+        .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 
     JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
         org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
         IntWritable.class, Text.class, Job.getInstance().getConfiguration());
-    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
+    assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
   }
 
   @Test
@@ -1315,21 +1134,12 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+        .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 
     JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
         SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
+    assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
   }
 
   @SuppressWarnings("unchecked")
@@ -1343,34 +1153,19 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
-      @Override
-      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
-        return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
-      }
-    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
-        DefaultCodec.class);
+    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+        .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,  DefaultCodec.class);
 
     JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
         SequenceFileInputFormat.class, IntWritable.class, Text.class);
 
-    assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
-      @Override
-      public String call(Tuple2<IntWritable, Text> x) {
-        return x.toString();
-      }
-    }).collect().toString());
+    assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
   }
 
   @Test
   public void zip() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
-      @Override
-      public double call(Integer x) {
-        return x.doubleValue();
-      }
-    });
+    JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue);
     JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
     zipped.count();
   }
@@ -1380,12 +1175,7 @@ public class JavaAPISuite implements Serializable {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
     JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
     FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
-      new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
-        @Override
-        public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
-          return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
-        }
-      };
+        (i, s) -> Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
 
     JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
     assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
@@ -1396,22 +1186,12 @@ public class JavaAPISuite implements Serializable {
   public void accumulators() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
 
-    final Accumulator<Integer> intAccum = sc.intAccumulator(10);
-    rdd.foreach(new VoidFunction<Integer>() {
-      @Override
-      public void call(Integer x) {
-        intAccum.add(x);
-      }
-    });
+    Accumulator<Integer> intAccum = sc.intAccumulator(10);
+    rdd.foreach(intAccum::add);
     assertEquals((Integer) 25, intAccum.value());
 
-    final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
-    rdd.foreach(new VoidFunction<Integer>() {
-      @Override
-      public void call(Integer x) {
-        doubleAccum.add((double) x);
-      }
-    });
+    Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+    rdd.foreach(x -> doubleAccum.add((double) x));
     assertEquals((Double) 25.0, doubleAccum.value());
 
     // Try a custom accumulator type
@@ -1432,13 +1212,8 @@ public class JavaAPISuite implements Serializable {
       }
     };
 
-    final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
-    rdd.foreach(new VoidFunction<Integer>() {
-      @Override
-      public void call(Integer x) {
-        floatAccum.add((float) x);
-      }
-    });
+    Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
+    rdd.foreach(x -> floatAccum.add((float) x));
     assertEquals((Float) 25.0f, floatAccum.value());
 
     // Test the setValue method
@@ -1449,12 +1224,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void keyBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
-    List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
-      @Override
-      public String call(Integer t) {
-        return t.toString();
-      }
-    }).collect();
+    List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
     assertEquals(new Tuple2<>("1", 1), s.get(0));
     assertEquals(new Tuple2<>("2", 2), s.get(1));
   }
@@ -1487,26 +1257,10 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void combineByKey() {
     JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
-    Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() {
-      @Override
-      public Integer call(Integer v1) {
-        return v1 % 3;
-      }
-    };
-    Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() {
-      @Override
-      public Integer call(Integer v1) {
-        return v1;
-      }
-    };
+    Function<Integer, Integer> keyFunction = v1 -> v1 % 3;
+    Function<Integer, Integer> createCombinerFunction = v1 -> v1;
 
-    Function2<Integer, Integer, Integer> mergeValueFunction =
-        new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer v1, Integer v2) {
-        return v1 + v2;
-      }
-    };
+    Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;
 
     JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
         .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
@@ -1534,20 +1288,8 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
-        new PairFunction<Integer, Integer, Integer>() {
-          @Override
-          public Tuple2<Integer, Integer> call(Integer i) {
-            return new Tuple2<>(i, i % 2);
-          }
-        });
-    JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
-        new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
-          @Override
-          public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
-            return new Tuple2<>(in._2(), in._1());
-          }
-        });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
+    JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
     assertEquals(Arrays.asList(
         new Tuple2<>(1, 1),
         new Tuple2<>(0, 2),
@@ -1561,13 +1303,7 @@ public class JavaAPISuite implements Serializable {
   public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
-        new PairFunction<Integer, Integer, Integer>() {
-          @Override
-          public Tuple2<Integer, Integer> call(Integer i) {
-            return new Tuple2<>(i, i % 2);
-          }
-        });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
 
     List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
     assertEquals(Arrays.asList(1, 2), parts[0]);
@@ -1623,13 +1359,7 @@ public class JavaAPISuite implements Serializable {
   public void collectAsMapWithIntArrayValues() {
     // Regression test for SPARK-1040
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
-    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(
-        new PairFunction<Integer, Integer, int[]>() {
-          @Override
-          public Tuple2<Integer, int[]> call(Integer x) {
-            return new Tuple2<>(x, new int[]{x});
-          }
-        });
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
     pairRDD.collect();  // Works fine
     pairRDD.collectAsMap();  // Used to crash with ClassCastException
   }
@@ -1651,13 +1381,7 @@ public class JavaAPISuite implements Serializable {
   @SuppressWarnings("unchecked")
   public void sampleByKey() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
-      new PairFunction<Integer, Integer, Integer>() {
-        @Override
-        public Tuple2<Integer, Integer> call(Integer i) {
-          return new Tuple2<>(i % 2, 1);
-        }
-      });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
     Map<Integer, Double> fractions = new HashMap<>();
     fractions.put(0, 0.5);
     fractions.put(1, 1.0);
@@ -1677,13 +1401,7 @@ public class JavaAPISuite implements Serializable {
   @SuppressWarnings("unchecked")
   public void sampleByKeyExact() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
-      new PairFunction<Integer, Integer, Integer>() {
-          @Override
-          public Tuple2<Integer, Integer> call(Integer i) {
-              return new Tuple2<>(i % 2, 1);
-          }
-      });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
     Map<Integer, Double> fractions = new HashMap<>();
     fractions.put(0, 0.5);
     fractions.put(1, 1.0);
@@ -1754,14 +1472,7 @@ public class JavaAPISuite implements Serializable {
   public void foreachAsync() throws Exception {
     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
-    JavaFutureAction<Void> future = rdd.foreachAsync(
-        new VoidFunction<Integer>() {
-          @Override
-          public void call(Integer integer) {
-            // intentionally left blank.
-          }
-        }
-    );
+    JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {});
     future.get();
     assertFalse(future.isCancelled());
     assertTrue(future.isDone());
@@ -1784,11 +1495,8 @@ public class JavaAPISuite implements Serializable {
   public void testAsyncActionCancellation() throws Exception {
     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
-    JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() {
-      @Override
-      public void call(Integer integer) throws InterruptedException {
-        Thread.sleep(10000);  // To ensure that the job won't finish before it's cancelled.
-      }
+    JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {
+      Thread.sleep(10000);  // To ensure that the job won't finish before it's cancelled.
     });
     future.cancel(true);
     assertTrue(future.isCancelled());
@@ -1805,7 +1513,7 @@ public class JavaAPISuite implements Serializable {
   public void testAsyncActionErrorWrapping() throws Exception {
     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
-    JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync();
+    JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<>()).countAsync();
     try {
       future.get(2, TimeUnit.SECONDS);
       fail("Expected future.get() for failed job to throw ExcecutionException");

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index ba57b6b..938cc8d 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -59,39 +59,39 @@ public class JavaConsumerStrategySuite implements Serializable {
       );
 
     final ConsumerStrategy<String, String> sub1 =
-      ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams, sOffsets);
+      ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets);
     final ConsumerStrategy<String, String> sub2 =
-      ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams);
+      ConsumerStrategies.Subscribe(sTopics, sKafkaParams);
     final ConsumerStrategy<String, String> sub3 =
-      ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams, offsets);
+      ConsumerStrategies.Subscribe(topics, kafkaParams, offsets);
     final ConsumerStrategy<String, String> sub4 =
-      ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams);
+      ConsumerStrategies.Subscribe(topics, kafkaParams);
 
     Assert.assertEquals(
       sub1.executorKafkaParams().get("bootstrap.servers"),
       sub3.executorKafkaParams().get("bootstrap.servers"));
 
     final ConsumerStrategy<String, String> psub1 =
-      ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
+      ConsumerStrategies.SubscribePattern(pat, sKafkaParams, sOffsets);
     final ConsumerStrategy<String, String> psub2 =
-      ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
+      ConsumerStrategies.SubscribePattern(pat, sKafkaParams);
     final ConsumerStrategy<String, String> psub3 =
-      ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
+      ConsumerStrategies.SubscribePattern(pat, kafkaParams, offsets);
     final ConsumerStrategy<String, String> psub4 =
-      ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);
+      ConsumerStrategies.SubscribePattern(pat, kafkaParams);
 
     Assert.assertEquals(
       psub1.executorKafkaParams().get("bootstrap.servers"),
       psub3.executorKafkaParams().get("bootstrap.servers"));
 
     final ConsumerStrategy<String, String> asn1 =
-      ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
+      ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets);
     final ConsumerStrategy<String, String> asn2 =
-      ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams);
+      ConsumerStrategies.Assign(sParts, sKafkaParams);
     final ConsumerStrategy<String, String> asn3 =
-      ConsumerStrategies.<String, String>Assign(parts, kafkaParams, offsets);
+      ConsumerStrategies.Assign(parts, kafkaParams, offsets);
     final ConsumerStrategy<String, String> asn4 =
-      ConsumerStrategies.<String, String>Assign(parts, kafkaParams);
+      ConsumerStrategies.Assign(parts, kafkaParams);
 
     Assert.assertEquals(
       asn1.executorKafkaParams().get("bootstrap.servers"),

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index d569b66..2e050f8 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -217,7 +217,7 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
     String deployMode = isDriver ? "client" : "cluster";
 
     SparkSubmitCommandBuilder launcher =
-      newCommandBuilder(Collections.<String>emptyList());
+      newCommandBuilder(Collections.emptyList());
     launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
       System.getProperty("spark.test.home"));
     launcher.master = "yarn";

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
index 3bc35da..9ff7ace 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
@@ -44,7 +44,7 @@ public class SparkSubmitOptionParserSuite extends BaseSuite {
         count++;
         verify(parser).handle(eq(optNames[0]), eq(value));
         verify(parser, times(count)).handle(anyString(), anyString());
-        verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+        verify(parser, times(count)).handleExtraArgs(eq(Collections.emptyList()));
       }
     }
 
@@ -54,9 +54,9 @@ public class SparkSubmitOptionParserSuite extends BaseSuite {
         parser.parse(Arrays.asList(name));
         count++;
         switchCount++;
-        verify(parser, times(switchCount)).handle(eq(switchNames[0]), same((String) null));
+        verify(parser, times(switchCount)).handle(eq(switchNames[0]), same(null));
         verify(parser, times(count)).handle(anyString(), any(String.class));
-        verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+        verify(parser, times(count)).handleExtraArgs(eq(Collections.emptyList()));
       }
     }
   }
@@ -80,7 +80,7 @@ public class SparkSubmitOptionParserSuite extends BaseSuite {
     List<String> args = Arrays.asList(parser.MASTER + "=" + parser.MASTER);
     parser.parse(args);
     verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER));
-    verify(parser).handleExtraArgs(eq(Collections.<String>emptyList()));
+    verify(parser).handleExtraArgs(eq(Collections.emptyList()));
   }
 
   private static class DummyParser extends SparkSubmitOptionParser {

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
index 8c0338e..683ceff 100644
--- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
@@ -21,16 +21,14 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
-import scala.Tuple2;
-
 import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.spark.SharedSparkSession;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.ml.linalg.Vector;
 import org.apache.spark.ml.linalg.Vectors;
+import org.apache.spark.mllib.linalg.DenseVector;
 import org.apache.spark.mllib.linalg.Matrix;
 import org.apache.spark.mllib.linalg.distributed.RowMatrix;
 import org.apache.spark.sql.Dataset;
@@ -69,35 +67,22 @@ public class JavaPCASuite extends SharedSparkSession {
     JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2);
 
     RowMatrix mat = new RowMatrix(dataRDD.map(
-            new Function<Vector, org.apache.spark.mllib.linalg.Vector>() {
-              public org.apache.spark.mllib.linalg.Vector call(Vector vector) {
-                return new org.apache.spark.mllib.linalg.DenseVector(vector.toArray());
-              }
-            }
+        (Vector vector) -> (org.apache.spark.mllib.linalg.Vector) new DenseVector(vector.toArray())
     ).rdd());
 
     Matrix pc = mat.computePrincipalComponents(3);
 
     mat.multiply(pc).rows().toJavaRDD();
 
-    JavaRDD<Vector> expected = mat.multiply(pc).rows().toJavaRDD().map(
-      new Function<org.apache.spark.mllib.linalg.Vector, Vector>() {
-        public Vector call(org.apache.spark.mllib.linalg.Vector vector) {
-          return vector.asML();
-        }
-      }
-    );
+    JavaRDD<Vector> expected = mat.multiply(pc).rows().toJavaRDD()
+        .map(org.apache.spark.mllib.linalg.Vector::asML);
 
-    JavaRDD<VectorPair> featuresExpected = dataRDD.zip(expected).map(
-      new Function<Tuple2<Vector, Vector>, VectorPair>() {
-        public VectorPair call(Tuple2<Vector, Vector> pair) {
-          VectorPair featuresExpected = new VectorPair();
-          featuresExpected.setFeatures(pair._1());
-          featuresExpected.setExpected(pair._2());
-          return featuresExpected;
-        }
-      }
-    );
+    JavaRDD<VectorPair> featuresExpected = dataRDD.zip(expected).map(pair -> {
+      VectorPair featuresExpected1 = new VectorPair();
+      featuresExpected1.setFeatures(pair._1());
+      featuresExpected1.setExpected(pair._2());
+      return featuresExpected1;
+    });
 
     Dataset<Row> df = spark.createDataFrame(featuresExpected, VectorPair.class);
     PCAModel pca = new PCA()

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
index 6ded42e..65db3d0 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
@@ -25,7 +25,6 @@ import org.junit.Test;
 
 import org.apache.spark.SharedSparkSession;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.linalg.Vector;
 import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
@@ -42,7 +41,7 @@ public class JavaNaiveBayesSuite extends SharedSparkSession {
     new LabeledPoint(2, Vectors.dense(0.0, 0.0, 2.0))
   );
 
-  private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) {
+  private static int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) {
     int correct = 0;
     for (LabeledPoint p : points) {
       if (model.predict(p.features()) == p.label()) {
@@ -80,12 +79,7 @@ public class JavaNaiveBayesSuite extends SharedSparkSession {
   public void testPredictJavaRDD() {
     JavaRDD<LabeledPoint> examples = jsc.parallelize(POINTS, 2).cache();
     NaiveBayesModel model = NaiveBayes.train(examples.rdd());
-    JavaRDD<Vector> vectors = examples.map(new Function<LabeledPoint, Vector>() {
-      @Override
-      public Vector call(LabeledPoint v) throws Exception {
-        return v.features();
-      }
-    });
+    JavaRDD<Vector> vectors = examples.map(LabeledPoint::features);
     JavaRDD<Double> predictions = model.predict(vectors);
     // Should be able to get the first prediction.
     predictions.first();

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java
index 3d62b27..b4196c6 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.clustering;
 
-import com.google.common.collect.Lists;
+import java.util.Arrays;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -31,7 +31,7 @@ public class JavaBisectingKMeansSuite extends SharedSparkSession {
 
   @Test
   public void twoDimensionalData() {
-    JavaRDD<Vector> points = jsc.parallelize(Lists.newArrayList(
+    JavaRDD<Vector> points = jsc.parallelize(Arrays.asList(
       Vectors.dense(4, -1),
       Vectors.dense(4, 1),
       Vectors.sparse(2, new int[]{0}, new double[]{1.0})

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java
index 08d6713..38ee250 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import scala.Tuple2;
 import scala.Tuple3;
@@ -30,7 +31,6 @@ import static org.junit.Assert.*;
 import org.apache.spark.SharedSparkSession;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.linalg.Matrix;
 import org.apache.spark.mllib.linalg.Vector;
 import org.apache.spark.mllib.linalg.Vectors;
@@ -39,7 +39,7 @@ public class JavaLDASuite extends SharedSparkSession {
   @Override
   public void setUp() throws IOException {
     super.setUp();
-    ArrayList<Tuple2<Long, Vector>> tinyCorpus = new ArrayList<>();
+    List<Tuple2<Long, Vector>> tinyCorpus = new ArrayList<>();
     for (int i = 0; i < LDASuite.tinyCorpus().length; i++) {
       tinyCorpus.add(new Tuple2<>((Long) LDASuite.tinyCorpus()[i]._1(),
         LDASuite.tinyCorpus()[i]._2()));
@@ -53,7 +53,7 @@ public class JavaLDASuite extends SharedSparkSession {
     Matrix topics = LDASuite.tinyTopics();
     double[] topicConcentration = new double[topics.numRows()];
     Arrays.fill(topicConcentration, 1.0D / topics.numRows());
-    LocalLDAModel model = new LocalLDAModel(topics, Vectors.dense(topicConcentration), 1D, 100D);
+    LocalLDAModel model = new LocalLDAModel(topics, Vectors.dense(topicConcentration), 1.0, 100.0);
 
     // Check: basic parameters
     assertEquals(model.k(), tinyK);
@@ -87,17 +87,17 @@ public class JavaLDASuite extends SharedSparkSession {
 
     // Check: basic parameters
     LocalLDAModel localModel = model.toLocal();
-    assertEquals(model.k(), k);
-    assertEquals(localModel.k(), k);
-    assertEquals(model.vocabSize(), tinyVocabSize);
-    assertEquals(localModel.vocabSize(), tinyVocabSize);
-    assertEquals(model.topicsMatrix(), localModel.topicsMatrix());
+    assertEquals(k, model.k());
+    assertEquals(k, localModel.k());
+    assertEquals(tinyVocabSize, model.vocabSize());
+    assertEquals(tinyVocabSize, localModel.vocabSize());
+    assertEquals(localModel.topicsMatrix(), model.topicsMatrix());
 
     // Check: topic summaries
     Tuple2<int[], double[]>[] roundedTopicSummary = model.describeTopics();
-    assertEquals(roundedTopicSummary.length, k);
+    assertEquals(k, roundedTopicSummary.length);
     Tuple2<int[], double[]>[] roundedLocalTopicSummary = localModel.describeTopics();
-    assertEquals(roundedLocalTopicSummary.length, k);
+    assertEquals(k, roundedLocalTopicSummary.length);
 
     // Check: log probabilities
     assertTrue(model.logLikelihood() < 0.0);
@@ -107,12 +107,8 @@ public class JavaLDASuite extends SharedSparkSession {
     JavaPairRDD<Long, Vector> topicDistributions = model.javaTopicDistributions();
     // SPARK-5562. since the topicDistribution returns the distribution of the non empty docs
     // over topics. Compare it against nonEmptyCorpus instead of corpus
-    JavaPairRDD<Long, Vector> nonEmptyCorpus = corpus.filter(
-      new Function<Tuple2<Long, Vector>, Boolean>() {
-        public Boolean call(Tuple2<Long, Vector> tuple2) {
-          return Vectors.norm(tuple2._2(), 1.0) != 0.0;
-        }
-      });
+    JavaPairRDD<Long, Vector> nonEmptyCorpus =
+        corpus.filter(tuple2 -> Vectors.norm(tuple2._2(), 1.0) != 0.0);
     assertEquals(topicDistributions.count(), nonEmptyCorpus.count());
 
     // Check: javaTopTopicsPerDocuments
@@ -155,14 +151,14 @@ public class JavaLDASuite extends SharedSparkSession {
     LDAModel model = lda.run(corpus);
 
     // Check: basic parameters
-    assertEquals(model.k(), k);
-    assertEquals(model.vocabSize(), tinyVocabSize);
+    assertEquals(k, model.k());
+    assertEquals(tinyVocabSize, model.vocabSize());
 
     // Check: topic summaries
     Tuple2<int[], double[]>[] roundedTopicSummary = model.describeTopics();
-    assertEquals(roundedTopicSummary.length, k);
+    assertEquals(k, roundedTopicSummary.length);
     Tuple2<int[], double[]>[] roundedLocalTopicSummary = model.describeTopics();
-    assertEquals(roundedLocalTopicSummary.length, k);
+    assertEquals(k, roundedLocalTopicSummary.length);
   }
 
   @Test
@@ -177,7 +173,7 @@ public class JavaLDASuite extends SharedSparkSession {
     double logPerplexity = toyModel.logPerplexity(pairedDocs);
 
     // check: logLikelihood.
-    ArrayList<Tuple2<Long, Vector>> docsSingleWord = new ArrayList<>();
+    List<Tuple2<Long, Vector>> docsSingleWord = new ArrayList<>();
     docsSingleWord.add(new Tuple2<>(0L, Vectors.dense(1.0, 0.0, 0.0)));
     JavaPairRDD<Long, Vector> single = JavaPairRDD.fromJavaRDD(jsc.parallelize(docsSingleWord));
     double logLikelihood = toyModel.logLikelihood(single);
@@ -190,6 +186,6 @@ public class JavaLDASuite extends SharedSparkSession {
     LDASuite.tinyTopicDescription();
   private JavaPairRDD<Long, Vector> corpus;
   private LocalLDAModel toyModel = LDASuite.toyModel();
-  private ArrayList<Tuple2<Long, Vector>> toyData = LDASuite.javaToyData();
+  private List<Tuple2<Long, Vector>> toyData = LDASuite.javaToyData();
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java
index 3451e07..15de566 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java
@@ -31,9 +31,9 @@ public class JavaAssociationRulesSuite extends SharedSparkSession {
 
     @SuppressWarnings("unchecked")
     JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = jsc.parallelize(Arrays.asList(
-      new FreqItemset<String>(new String[]{"a"}, 15L),
-      new FreqItemset<String>(new String[]{"b"}, 35L),
-      new FreqItemset<String>(new String[]{"a", "b"}, 12L)
+      new FreqItemset<>(new String[]{"a"}, 15L),
+      new FreqItemset<>(new String[]{"b"}, 35L),
+      new FreqItemset<>(new String[]{"a", "b"}, 12L)
     ));
 
     JavaRDD<AssociationRules.Rule<String>> results = (new AssociationRules()).run(freqItemsets);

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java
index a46b132..86c723a 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java
@@ -24,13 +24,13 @@ import org.junit.Test;
 
 import org.apache.spark.SharedSparkSession;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.linalg.Vector;
 import org.apache.spark.mllib.util.LinearDataGenerator;
 
 public class JavaLinearRegressionSuite extends SharedSparkSession {
 
-  int validatePrediction(List<LabeledPoint> validationData, LinearRegressionModel model) {
+  private static int validatePrediction(
+      List<LabeledPoint> validationData, LinearRegressionModel model) {
     int numAccurate = 0;
     for (LabeledPoint point : validationData) {
       Double prediction = model.predict(point.features());
@@ -87,12 +87,7 @@ public class JavaLinearRegressionSuite extends SharedSparkSession {
       LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache();
     LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD();
     LinearRegressionModel model = linSGDImpl.run(testRDD.rdd());
-    JavaRDD<Vector> vectors = testRDD.map(new Function<LabeledPoint, Vector>() {
-      @Override
-      public Vector call(LabeledPoint v) throws Exception {
-        return v.features();
-      }
-    });
+    JavaRDD<Vector> vectors = testRDD.map(LabeledPoint::features);
     JavaRDD<Double> predictions = model.predict(vectors);
     // Should be able to get the first prediction.
     predictions.first();

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java
index 1dcbbca..0f71deb 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java
@@ -25,8 +25,6 @@ import org.junit.Test;
 
 import org.apache.spark.SharedSparkSession;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.mllib.linalg.Vector;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.tree.configuration.Algo;
 import org.apache.spark.mllib.tree.configuration.Strategy;
@@ -35,7 +33,7 @@ import org.apache.spark.mllib.tree.model.DecisionTreeModel;
 
 public class JavaDecisionTreeSuite extends SharedSparkSession {
 
-  int validatePrediction(List<LabeledPoint> validationData, DecisionTreeModel model) {
+  private static int validatePrediction(List<LabeledPoint> validationData, DecisionTreeModel model) {
     int numCorrect = 0;
     for (LabeledPoint point : validationData) {
       Double prediction = model.predict(point.features());
@@ -63,7 +61,7 @@ public class JavaDecisionTreeSuite extends SharedSparkSession {
     DecisionTreeModel model = learner.run(rdd.rdd());
 
     int numCorrect = validatePrediction(arr, model);
-    Assert.assertTrue(numCorrect == rdd.count());
+    Assert.assertEquals(numCorrect, rdd.count());
   }
 
   @Test
@@ -82,15 +80,10 @@ public class JavaDecisionTreeSuite extends SharedSparkSession {
     DecisionTreeModel model = DecisionTree$.MODULE$.train(rdd.rdd(), strategy);
 
     // java compatibility test
-    JavaRDD<Double> predictions = model.predict(rdd.map(new Function<LabeledPoint, Vector>() {
-      @Override
-      public Vector call(LabeledPoint v1) {
-        return v1.features();
-      }
-    }));
+    JavaRDD<Double> predictions = model.predict(rdd.map(LabeledPoint::features));
 
     int numCorrect = validatePrediction(arr, model);
-    Assert.assertTrue(numCorrect == rdd.count());
+    Assert.assertEquals(numCorrect, rdd.count());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 06cd9ea..bf87174 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -157,7 +157,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     // to the accumulator. So we can check if the row groups are filtered or not in test case.
     TaskContext taskContext = TaskContext$.MODULE$.get();
     if (taskContext != null) {
-      Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>) taskContext.taskMetrics()
+      Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics()
         .lookForAccumulatorByName("numRowGroups");
       if (accu.isDefined()) {
         ((LongAccumulator)accu.get()).add((long)blocks.size());

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java
index 8b8a403..6ffccee 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java
@@ -35,27 +35,35 @@ public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase
   public void testTypedAggregationAverage() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
     Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(v -> (double)(v._2() * 2)));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList());
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)),
+        agged.collectAsList());
   }
 
   @Test
   public void testTypedAggregationCount() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
     Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(v -> v));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 2L), tuple2("b", 1L)), agged.collectAsList());
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 2L), new Tuple2<>("b", 1L)),
+        agged.collectAsList());
   }
 
   @Test
   public void testTypedAggregationSumDouble() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
     Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(v -> (double)v._2()));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList());
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 3.0)),
+        agged.collectAsList());
   }
 
   @Test
   public void testTypedAggregationSumLong() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
     Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(v -> (long)v._2()));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3L), tuple2("b", 3L)), agged.collectAsList());
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 3L), new Tuple2<>("b", 3L)),
+        agged.collectAsList());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
index 573d0e3..bf8ff61 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -95,12 +94,7 @@ public class JavaApplySchemaSuite implements Serializable {
     personList.add(person2);
 
     JavaRDD<Row> rowRDD = jsc.parallelize(personList).map(
-      new Function<Person, Row>() {
-        @Override
-        public Row call(Person person) throws Exception {
-          return RowFactory.create(person.getName(), person.getAge());
-        }
-      });
+        person -> RowFactory.create(person.getName(), person.getAge()));
 
     List<StructField> fields = new ArrayList<>(2);
     fields.add(DataTypes.createStructField("name", DataTypes.StringType, false));
@@ -131,12 +125,7 @@ public class JavaApplySchemaSuite implements Serializable {
     personList.add(person2);
 
     JavaRDD<Row> rowRDD = jsc.parallelize(personList).map(
-        new Function<Person, Row>() {
-          @Override
-          public Row call(Person person) {
-            return RowFactory.create(person.getName(), person.getAge());
-          }
-        });
+        person -> RowFactory.create(person.getName(), person.getAge()));
 
     List<StructField> fields = new ArrayList<>(2);
     fields.add(DataTypes.createStructField("", DataTypes.StringType, false));
@@ -146,12 +135,7 @@ public class JavaApplySchemaSuite implements Serializable {
     Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
     df.createOrReplaceTempView("people");
     List<String> actual = spark.sql("SELECT * FROM people").toJavaRDD()
-      .map(new Function<Row, String>() {
-        @Override
-        public String call(Row row) {
-          return row.getString(0) + "_" + row.get(1);
-        }
-      }).collect();
+      .map(row -> row.getString(0) + "_" + row.get(1)).collect();
 
     List<String> expected = new ArrayList<>(2);
     expected.add("Michael_29");

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index c44fc3d..c3b94a4 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -189,7 +189,7 @@ public class JavaDataFrameSuite {
     for (int i = 0; i < d.length(); i++) {
       Assert.assertEquals(bean.getD().get(i), d.apply(i));
     }
-    // Java.math.BigInteger is equavient to Spark Decimal(38,0)
+    // Java.math.BigInteger is equivalent to Spark Decimal(38,0)
     Assert.assertEquals(new BigDecimal(bean.getE()), first.getDecimal(4));
   }
 
@@ -231,13 +231,10 @@ public class JavaDataFrameSuite {
     Assert.assertEquals(0, schema2.fieldIndex("id"));
   }
 
-  private static final Comparator<Row> crosstabRowComparator = new Comparator<Row>() {
-    @Override
-    public int compare(Row row1, Row row2) {
-      String item1 = row1.getString(0);
-      String item2 = row2.getString(0);
-      return item1.compareTo(item2);
-    }
+  private static final Comparator<Row> crosstabRowComparator = (row1, row2) -> {
+    String item1 = row1.getString(0);
+    String item2 = row2.getString(0);
+    return item1.compareTo(item2);
   };
 
   @Test
@@ -249,7 +246,7 @@ public class JavaDataFrameSuite {
     Assert.assertEquals("1", columnNames[1]);
     Assert.assertEquals("2", columnNames[2]);
     List<Row> rows = crosstab.collectAsList();
-    Collections.sort(rows, crosstabRowComparator);
+    rows.sort(crosstabRowComparator);
     Integer count = 1;
     for (Row row : rows) {
       Assert.assertEquals(row.get(0).toString(), count.toString());
@@ -284,7 +281,7 @@ public class JavaDataFrameSuite {
   @Test
   public void testSampleBy() {
     Dataset<Row> df = spark.range(0, 100, 1, 2).select(col("id").mod(3).as("key"));
-    Dataset<Row> sampled = df.stat().<Integer>sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L);
+    Dataset<Row> sampled = df.stat().sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L);
     List<Row> actual = sampled.groupBy("key").count().orderBy("key").collectAsList();
     Assert.assertEquals(0, actual.get(0).getLong(0));
     Assert.assertTrue(0 <= actual.get(0).getLong(1) && actual.get(0).getLong(1) <= 8);
@@ -296,7 +293,7 @@ public class JavaDataFrameSuite {
   public void pivot() {
     Dataset<Row> df = spark.table("courseSales");
     List<Row> actual = df.groupBy("year")
-      .pivot("course", Arrays.<Object>asList("dotNET", "Java"))
+      .pivot("course", Arrays.asList("dotNET", "Java"))
       .agg(sum("earnings")).orderBy("year").collectAsList();
 
     Assert.assertEquals(2012, actual.get(0).getInt(0));
@@ -352,24 +349,24 @@ public class JavaDataFrameSuite {
     Dataset<Long> df = spark.range(1000);
 
     CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42);
-    Assert.assertEquals(sketch1.totalCount(), 1000);
-    Assert.assertEquals(sketch1.depth(), 10);
-    Assert.assertEquals(sketch1.width(), 20);
+    Assert.assertEquals(1000, sketch1.totalCount());
+    Assert.assertEquals(10, sketch1.depth());
+    Assert.assertEquals(20, sketch1.width());
 
     CountMinSketch sketch2 = df.stat().countMinSketch(col("id"), 10, 20, 42);
-    Assert.assertEquals(sketch2.totalCount(), 1000);
-    Assert.assertEquals(sketch2.depth(), 10);
-    Assert.assertEquals(sketch2.width(), 20);
+    Assert.assertEquals(1000, sketch2.totalCount());
+    Assert.assertEquals(10, sketch2.depth());
+    Assert.assertEquals(20, sketch2.width());
 
     CountMinSketch sketch3 = df.stat().countMinSketch("id", 0.001, 0.99, 42);
-    Assert.assertEquals(sketch3.totalCount(), 1000);
-    Assert.assertEquals(sketch3.relativeError(), 0.001, 1e-4);
-    Assert.assertEquals(sketch3.confidence(), 0.99, 5e-3);
+    Assert.assertEquals(1000, sketch3.totalCount());
+    Assert.assertEquals(0.001, sketch3.relativeError(), 1.0e-4);
+    Assert.assertEquals(0.99, sketch3.confidence(), 5.0e-3);
 
     CountMinSketch sketch4 = df.stat().countMinSketch(col("id"), 0.001, 0.99, 42);
-    Assert.assertEquals(sketch4.totalCount(), 1000);
-    Assert.assertEquals(sketch4.relativeError(), 0.001, 1e-4);
-    Assert.assertEquals(sketch4.confidence(), 0.99, 5e-3);
+    Assert.assertEquals(1000, sketch4.totalCount());
+    Assert.assertEquals(0.001, sketch4.relativeError(), 1.0e-4);
+    Assert.assertEquals(0.99, sketch4.confidence(), 5.0e-3);
   }
 
   @Test
@@ -389,13 +386,13 @@ public class JavaDataFrameSuite {
     }
 
     BloomFilter filter3 = df.stat().bloomFilter("id", 1000, 64 * 5);
-    Assert.assertTrue(filter3.bitSize() == 64 * 5);
+    Assert.assertEquals(64 * 5, filter3.bitSize());
     for (int i = 0; i < 1000; i++) {
       Assert.assertTrue(filter3.mightContain(i));
     }
 
     BloomFilter filter4 = df.stat().bloomFilter(col("id").multiply(3), 1000, 64 * 5);
-    Assert.assertTrue(filter4.bitSize() == 64 * 5);
+    Assert.assertEquals(64 * 5, filter4.bitSize());
     for (int i = 0; i < 1000; i++) {
       Assert.assertTrue(filter4.mightContain(i * 3));
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
index fe86371..d3769a7 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
@@ -24,7 +24,6 @@ import scala.Tuple2;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
@@ -41,7 +40,9 @@ public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
 
     Dataset<Tuple2<String, Integer>> agged = grouped.agg(new IntSumOf().toColumn());
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList());
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 3), new Tuple2<>("b", 3)),
+        agged.collectAsList());
 
     Dataset<Tuple2<String, Integer>> agged2 = grouped.agg(new IntSumOf().toColumn())
       .as(Encoders.tuple(Encoders.STRING(), Encoders.INT()));
@@ -87,48 +88,36 @@ public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase {
   @Test
   public void testTypedAggregationAverage() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(
-      new MapFunction<Tuple2<String, Integer>, Double>() {
-        public Double call(Tuple2<String, Integer> value) throws Exception {
-          return (double)(value._2() * 2);
-        }
-      }));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList());
+    Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(value -> (double)(value._2() * 2)));
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)),
+        agged.collectAsList());
   }
 
   @Test
   public void testTypedAggregationCount() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(
-      new MapFunction<Tuple2<String, Integer>, Object>() {
-        public Object call(Tuple2<String, Integer> value) throws Exception {
-          return value;
-        }
-      }));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 2), tuple2("b", 1)), agged.collectAsList());
+    Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(value -> value));
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 2L), new Tuple2<>("b", 1L)),
+        agged.collectAsList());
   }
 
   @Test
   public void testTypedAggregationSumDouble() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(
-      new MapFunction<Tuple2<String, Integer>, Double>() {
-        public Double call(Tuple2<String, Integer> value) throws Exception {
-          return (double)value._2();
-        }
-      }));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList());
+    Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(value -> (double) value._2()));
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 3.0)),
+        agged.collectAsList());
   }
 
   @Test
   public void testTypedAggregationSumLong() {
     KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(
-      new MapFunction<Tuple2<String, Integer>, Long>() {
-        public Long call(Tuple2<String, Integer> value) throws Exception {
-          return (long)value._2();
-        }
-      }));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList());
+    Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(value -> (long) value._2()));
+    Assert.assertEquals(
+        Arrays.asList(new Tuple2<>("a", 3L), new Tuple2<>("b", 3L)),
+        agged.collectAsList());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java
index 8fc4eff..e62db7d 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java
@@ -52,23 +52,13 @@ public class JavaDatasetAggregatorSuiteBase implements Serializable {
     spark = null;
   }
 
-  protected <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
-    return new Tuple2<>(t1, t2);
-  }
-
   protected KeyValueGroupedDataset<String, Tuple2<String, Integer>> generateGroupedDataset() {
     Encoder<Tuple2<String, Integer>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT());
     List<Tuple2<String, Integer>> data =
-      Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3));
+      Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("b", 3));
     Dataset<Tuple2<String, Integer>> ds = spark.createDataset(data, encoder);
 
-    return ds.groupByKey(
-      new MapFunction<Tuple2<String, Integer>, String>() {
-        @Override
-        public String call(Tuple2<String, Integer> value) throws Exception {
-          return value._1();
-        }
-      },
+    return ds.groupByKey((MapFunction<Tuple2<String, Integer>, String>) value -> value._1(),
       Encoders.STRING());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org